blob: e765d6b1e975cae1a4a1ad8cfd90a2530511ec3c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io;
import java.io.*;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.DirectoryStream;
import java.nio.file.DirectoryIteratorException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Shell;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
/**
* An utility class for I/O related functionality.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class IOUtils {
public static final Logger LOG = LoggerFactory.getLogger(IOUtils.class);
/**
* Copies from one stream to another.
*
* @param in InputStrem to read from
* @param out OutputStream to write to
* @param buffSize the size of the buffer
* @param close whether or not close the InputStream and
* OutputStream at the end. The streams are closed in the finally clause.
*/
public static void copyBytes(InputStream in, OutputStream out,
int buffSize, boolean close)
throws IOException {
try {
copyBytes(in, out, buffSize);
if(close) {
out.close();
out = null;
in.close();
in = null;
}
} finally {
if(close) {
closeStream(out);
closeStream(in);
}
}
}
/**
* Copies from one stream to another.
*
* @param in InputStrem to read from
* @param out OutputStream to write to
* @param buffSize the size of the buffer
*/
public static void copyBytes(InputStream in, OutputStream out, int buffSize)
throws IOException {
PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
byte buf[] = new byte[buffSize];
int bytesRead = in.read(buf);
while (bytesRead >= 0) {
out.write(buf, 0, bytesRead);
if ((ps != null) && ps.checkError()) {
throw new IOException("Unable to write to output stream.");
}
bytesRead = in.read(buf);
}
}
/**
* Copies from one stream to another. <strong>closes the input and output streams
* at the end</strong>.
*
* @param in InputStrem to read from
* @param out OutputStream to write to
* @param conf the Configuration object
*/
public static void copyBytes(InputStream in, OutputStream out, Configuration conf)
throws IOException {
copyBytes(in, out, conf.getInt(
IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT), true);
}
/**
* Copies from one stream to another.
*
* @param in InputStream to read from
* @param out OutputStream to write to
* @param conf the Configuration object
* @param close whether or not close the InputStream and
* OutputStream at the end. The streams are closed in the finally clause.
*/
public static void copyBytes(InputStream in, OutputStream out, Configuration conf, boolean close)
throws IOException {
copyBytes(in, out, conf.getInt(
IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT), close);
}
/**
* Copies count bytes from one stream to another.
*
* @param in InputStream to read from
* @param out OutputStream to write to
* @param count number of bytes to copy
* @param close whether to close the streams
* @throws IOException if bytes can not be read or written
*/
public static void copyBytes(InputStream in, OutputStream out, long count,
boolean close) throws IOException {
byte buf[] = new byte[4096];
long bytesRemaining = count;
int bytesRead;
try {
while (bytesRemaining > 0) {
int bytesToRead = (int)
(bytesRemaining < buf.length ? bytesRemaining : buf.length);
bytesRead = in.read(buf, 0, bytesToRead);
if (bytesRead == -1)
break;
out.write(buf, 0, bytesRead);
bytesRemaining -= bytesRead;
}
if (close) {
out.close();
out = null;
in.close();
in = null;
}
} finally {
if (close) {
closeStream(out);
closeStream(in);
}
}
}
/**
* Utility wrapper for reading from {@link InputStream}. It catches any errors
* thrown by the underlying stream (either IO or decompression-related), and
* re-throws as an IOException.
*
* @param is - InputStream to be read from
* @param buf - buffer the data is read into
* @param off - offset within buf
* @param len - amount of data to be read
* @return number of bytes read
*/
public static int wrappedReadForCompressedData(InputStream is, byte[] buf,
int off, int len) throws IOException {
try {
return is.read(buf, off, len);
} catch (IOException ie) {
throw ie;
} catch (Throwable t) {
throw new IOException("Error while reading compressed data", t);
}
}
/**
* Reads len bytes in a loop.
*
* @param in InputStream to read from
* @param buf The buffer to fill
* @param off offset from the buffer
* @param len the length of bytes to read
* @throws IOException if it could not read requested number of bytes
* for any reason (including EOF)
*/
public static void readFully(InputStream in, byte[] buf,
int off, int len) throws IOException {
int toRead = len;
while (toRead > 0) {
int ret = in.read(buf, off, toRead);
if (ret < 0) {
throw new IOException( "Premature EOF from inputStream");
}
toRead -= ret;
off += ret;
}
}
/**
* Similar to readFully(). Skips bytes in a loop.
* @param in The InputStream to skip bytes from
* @param len number of bytes to skip.
* @throws IOException if it could not skip requested number of bytes
* for any reason (including EOF)
*/
public static void skipFully(InputStream in, long len) throws IOException {
long amt = len;
while (amt > 0) {
long ret = in.skip(amt);
if (ret == 0) {
// skip may return 0 even if we're not at EOF. Luckily, we can
// use the read() method to figure out if we're at the end.
int b = in.read();
if (b == -1) {
throw new EOFException( "Premature EOF from inputStream after " +
"skipping " + (len - amt) + " byte(s).");
}
ret = 1;
}
amt -= ret;
}
}
/**
* Close the Closeable objects and <b>ignore</b> any {@link Throwable} or
* null pointers. Must only be used for cleanup in exception handlers.
*
* @param log the log to record problems to at debug level. Can be null.
* @param closeables the objects to close
* @deprecated use {@link #cleanupWithLogger(Logger, java.io.Closeable...)}
* instead
*/
@Deprecated
public static void cleanup(Log log, java.io.Closeable... closeables) {
for (java.io.Closeable c : closeables) {
if (c != null) {
try {
c.close();
} catch(Throwable e) {
if (log != null && log.isDebugEnabled()) {
log.debug("Exception in closing " + c, e);
}
}
}
}
}
/**
* Close the Closeable objects and <b>ignore</b> any {@link Throwable} or
* null pointers. Must only be used for cleanup in exception handlers.
*
* @param logger the log to record problems to at debug level. Can be null.
* @param closeables the objects to close
*/
public static void cleanupWithLogger(Logger logger,
java.io.Closeable... closeables) {
for (java.io.Closeable c : closeables) {
if (c != null) {
try {
c.close();
} catch (Throwable e) {
if (logger != null) {
logger.debug("Exception in closing {}", c, e);
}
}
}
}
}
/**
* Closes the stream ignoring {@link Throwable}.
* Must only be called in cleaning up from exception handlers.
*
* @param stream the Stream to close
*/
public static void closeStream(java.io.Closeable stream) {
if (stream != null) {
cleanupWithLogger(null, stream);
}
}
/**
* Closes the streams ignoring {@link Throwable}.
* Must only be called in cleaning up from exception handlers.
*
* @param streams the Streams to close
*/
public static void closeStreams(java.io.Closeable... streams) {
if (streams != null) {
cleanupWithLogger(null, streams);
}
}
/**
* Closes the socket ignoring {@link IOException}
*
* @param sock the Socket to close
*/
public static void closeSocket(Socket sock) {
if (sock != null) {
try {
sock.close();
} catch (IOException ignored) {
LOG.debug("Ignoring exception while closing socket", ignored);
}
}
}
/**
* The /dev/null of OutputStreams.
*/
public static class NullOutputStream extends OutputStream {
@Override
public void write(byte[] b, int off, int len) throws IOException {
}
@Override
public void write(int b) throws IOException {
}
}
/**
* Write a ByteBuffer to a WritableByteChannel, handling short writes.
*
* @param bc The WritableByteChannel to write to
* @param buf The input buffer
* @throws IOException On I/O error
*/
public static void writeFully(WritableByteChannel bc, ByteBuffer buf)
throws IOException {
do {
bc.write(buf);
} while (buf.remaining() > 0);
}
/**
* Write a ByteBuffer to a FileChannel at a given offset,
* handling short writes.
*
* @param fc The FileChannel to write to
* @param buf The input buffer
* @param offset The offset in the file to start writing at
* @throws IOException On I/O error
*/
public static void writeFully(FileChannel fc, ByteBuffer buf,
long offset) throws IOException {
do {
offset += fc.write(buf, offset);
} while (buf.remaining() > 0);
}
/**
* Return the complete list of files in a directory as strings.<p/>
*
* This is better than File#listDir because it does not ignore IOExceptions.
*
* @param dir The directory to list.
* @param filter If non-null, the filter to use when listing
* this directory.
* @return The list of files in the directory.
*
* @throws IOException On I/O error
*/
public static List<String> listDirectory(File dir, FilenameFilter filter)
throws IOException {
ArrayList<String> list = new ArrayList<String> ();
try (DirectoryStream<Path> stream =
Files.newDirectoryStream(dir.toPath())) {
for (Path entry: stream) {
String fileName = entry.getFileName().toString();
if ((filter == null) || filter.accept(dir, fileName)) {
list.add(fileName);
}
}
} catch (DirectoryIteratorException e) {
throw e.getCause();
}
return list;
}
/**
* Ensure that any writes to the given file is written to the storage device
* that contains it. This method opens channel on given File and closes it
* once the sync is done.<br>
* Borrowed from Uwe Schindler in LUCENE-5588
* @param fileToSync the file to fsync
*/
public static void fsync(File fileToSync) throws IOException {
if (!fileToSync.exists()) {
throw new FileNotFoundException(
"File/Directory " + fileToSync.getAbsolutePath() + " does not exist");
}
boolean isDir = fileToSync.isDirectory();
// HDFS-13586, FileChannel.open fails with AccessDeniedException
// for any directory, ignore.
if (isDir && Shell.WINDOWS) {
return;
}
// If the file is a directory we have to open read-only, for regular files
// we must open r/w for the fsync to have an effect. See
// http://blog.httrack.com/blog/2013/11/15/
// everything-you-always-wanted-to-know-about-fsync/
try(FileChannel channel = FileChannel.open(fileToSync.toPath(),
isDir ? StandardOpenOption.READ : StandardOpenOption.WRITE)){
fsync(channel, isDir);
}
}
/**
* Ensure that any writes to the given file is written to the storage device
* that contains it. This method opens channel on given File and closes it
* once the sync is done.
* Borrowed from Uwe Schindler in LUCENE-5588
* @param channel Channel to sync
* @param isDir if true, the given file is a directory (Channel should be
* opened for read and ignore IOExceptions, because not all file
* systems and operating systems allow to fsync on a directory)
* @throws IOException
*/
public static void fsync(FileChannel channel, boolean isDir)
throws IOException {
try {
channel.force(true);
} catch (IOException ioe) {
if (isDir) {
assert !(Shell.LINUX
|| Shell.MAC) : "On Linux and MacOSX fsyncing a directory"
+ " should not throw IOException, we just don't want to rely"
+ " on that in production (undocumented)" + ". Got: " + ioe;
// Ignore exception if it is a directory
return;
}
// Throw original exception
throw ioe;
}
}
/**
* Reads a DataInput until EOF and returns a byte array. Make sure not to
* pass in an infinite DataInput or this will never return.
*
* @param in A DataInput
* @return a byte array containing the data from the DataInput
* @throws IOException on I/O error, other than EOF
*/
public static byte[] readFullyToByteArray(DataInput in) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
while (true) {
baos.write(in.readByte());
}
} catch (EOFException eof) {
// finished reading, do nothing
}
return baos.toByteArray();
}
}