| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.util; |
| |
| import org.apache.flink.core.fs.FSDataInputStream; |
| import org.apache.flink.core.fs.FSDataOutputStream; |
| import org.apache.flink.core.fs.FileStatus; |
| import org.apache.flink.core.fs.FileSystem; |
| import org.apache.flink.core.fs.Path; |
| import org.apache.flink.util.function.ThrowingConsumer; |
| |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.Channels; |
| import java.nio.channels.SeekableByteChannel; |
| import java.nio.channels.WritableByteChannel; |
| import java.nio.file.AccessDeniedException; |
| import java.nio.file.Files; |
| import java.nio.file.StandardOpenOption; |
| import java.util.Arrays; |
| import java.util.Random; |
| |
| import static org.apache.flink.util.Preconditions.checkNotNull; |
| |
| /** |
| * This is a utility class to deal files and directories. Contains utilities for recursive |
| * deletion and creation of temporary files. |
| */ |
| public final class FileUtils { |
| |
| /** Global lock to prevent concurrent directory deletes under Windows. */ |
| private static final Object WINDOWS_DELETE_LOCK = new Object(); |
| |
| /** The alphabet to construct the random part of the filename from. */ |
| private static final char[] ALPHABET = |
| { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'a', 'b', 'c', 'd', 'e', 'f' }; |
| |
| /** The length of the random part of the filename. */ |
| private static final int RANDOM_FILE_NAME_LENGTH = 12; |
| |
| /** The maximum size of array to allocate. More see {@link Files#MAX_BUFFER_SIZE}. */ |
| private static final int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8; |
| |
| // buffer size used for reading and writing |
| private static final int BUFFER_SIZE = 8192; |
| |
| // ------------------------------------------------------------------------ |
| |
| public static void writeCompletely(WritableByteChannel channel, ByteBuffer src) throws IOException { |
| while (src.hasRemaining()) { |
| channel.write(src); |
| } |
| } |
| |
| /** |
| * Constructs a random filename with the given prefix and |
| * a random part generated from hex characters. |
| * |
| * @param prefix |
| * the prefix to the filename to be constructed |
| * @return the generated random filename with the given prefix |
| */ |
| public static String getRandomFilename(final String prefix) { |
| final Random rnd = new Random(); |
| final StringBuilder stringBuilder = new StringBuilder(prefix); |
| |
| for (int i = 0; i < RANDOM_FILE_NAME_LENGTH; i++) { |
| stringBuilder.append(ALPHABET[rnd.nextInt(ALPHABET.length)]); |
| } |
| |
| return stringBuilder.toString(); |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Simple reading and writing of files |
| // ------------------------------------------------------------------------ |
| |
| public static String readFile(File file, String charsetName) throws IOException { |
| byte[] bytes = Files.readAllBytes(file.toPath()); |
| return new String(bytes, charsetName); |
| } |
| |
| public static String readFileUtf8(File file) throws IOException { |
| return readFile(file, "UTF-8"); |
| } |
| |
| public static void writeFile(File file, String contents, String encoding) throws IOException { |
| byte[] bytes = contents.getBytes(encoding); |
| Files.write(file.toPath(), bytes, StandardOpenOption.WRITE); |
| } |
| |
| public static void writeFileUtf8(File file, String contents) throws IOException { |
| writeFile(file, contents, "UTF-8"); |
| } |
| |
| /** |
| * Reads all the bytes from a file. Uses {@code directBufferSize} to limit |
| * the size of the direct buffer used to read. |
| * |
| * @param path |
| * the path to the file |
| * @param directBufferSize |
| * the size of direct buffer used to read |
| * @return a byte array containing the bytes read from the file |
| * @throws IOException |
| * if an I/O error occurs reading from the stream |
| */ |
| public static byte[] readAllBytes(java.nio.file.Path path, int directBufferSize) throws IOException { |
| try (SeekableByteChannel channel = Files.newByteChannel(path); |
| InputStream in = Channels.newInputStream(channel)) { |
| long size = channel.size(); |
| if (size > (long) MAX_BUFFER_SIZE) { |
| throw new OutOfMemoryError("Required array size too large"); |
| } |
| |
| return read(in, (int) size, directBufferSize); |
| } |
| } |
| |
| /** |
| * Reads all the bytes from an input stream. Uses {@code initialSize} as a hint |
| * about how many bytes the stream will have and uses {@code directBufferSize} |
| * to limit the size of the direct buffer used to read. |
| * |
| * @param source |
| * the input stream to read from |
| * @param initialSize |
| * the initial size of the byte array to allocate, use default {@link #BUFFER_SIZE} |
| * if <= 0 |
| * @param directBufferSize |
| * @return a byte array containing the bytes read from the file |
| * @throws IOException |
| * if an I/O error occurs reading from the stream |
| */ |
| public static byte[] read(InputStream source, int initialSize, int directBufferSize) throws IOException { |
| int capacity = initialSize; |
| byte[] buf = new byte[capacity]; |
| int nread = 0; |
| int n; |
| directBufferSize = (directBufferSize <= 0) ? BUFFER_SIZE : directBufferSize; |
| for (; ;) { |
| // read to EOF which may read more or less than initialSize (eg: file |
| // is truncated while we are reading) |
| while ((n = source.read(buf, nread, Math.min(capacity - nread, directBufferSize))) > 0) { |
| nread += n; |
| } |
| |
| // if last call to source.read() returned -1, we are done |
| // otherwise, try to read one more byte; if that failed we're done too |
| if (n < 0 || (n = source.read()) < 0) { |
| break; |
| } |
| |
| // one more byte was read; need to allocate a larger buffer |
| if (capacity <= MAX_BUFFER_SIZE - capacity) { |
| capacity = Math.max(capacity << 1, BUFFER_SIZE); |
| } else { |
| if (capacity == MAX_BUFFER_SIZE) { |
| throw new OutOfMemoryError("Required array size too large"); |
| } |
| capacity = MAX_BUFFER_SIZE; |
| } |
| buf = Arrays.copyOf(buf, capacity); |
| buf[nread++] = (byte) n; |
| } |
| return (capacity == nread) ? buf : Arrays.copyOf(buf, nread); |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Deleting directories on standard File Systems |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * Removes the given file or directory recursively. |
| * |
| * <p>If the file or directory does not exist, this does not throw an exception, but simply does nothing. |
| * It considers the fact that a file-to-be-deleted is not present a success. |
| * |
| * <p>This method is safe against other concurrent deletion attempts. |
| * |
| * @param file The file or directory to delete. |
| * |
| * @throws IOException Thrown if the directory could not be cleaned for some reason, for example |
| * due to missing access/write permissions. |
| */ |
| public static void deleteFileOrDirectory(File file) throws IOException { |
| checkNotNull(file, "file"); |
| |
| guardIfWindows(FileUtils::deleteFileOrDirectoryInternal, file); |
| } |
| |
| /** |
| * Deletes the given directory recursively. |
| * |
| * <p>If the directory does not exist, this does not throw an exception, but simply does nothing. |
| * It considers the fact that a directory-to-be-deleted is not present a success. |
| * |
| * <p>This method is safe against other concurrent deletion attempts. |
| * |
| * @param directory The directory to be deleted. |
| * @throws IOException Thrown if the given file is not a directory, or if the directory could not be |
| * deleted for some reason, for example due to missing access/write permissions. |
| */ |
| public static void deleteDirectory(File directory) throws IOException { |
| checkNotNull(directory, "directory"); |
| |
| guardIfWindows(FileUtils::deleteDirectoryInternal, directory); |
| } |
| |
| /** |
| * Deletes the given directory recursively, not reporting any I/O exceptions |
| * that occur. |
| * |
| * <p>This method is identical to {@link FileUtils#deleteDirectory(File)}, except that it |
| * swallows all exceptions and may leave the job quietly incomplete. |
| * |
| * @param directory The directory to delete. |
| */ |
| public static void deleteDirectoryQuietly(File directory) { |
| if (directory == null) { |
| return; |
| } |
| |
| // delete and do not report if it fails |
| try { |
| deleteDirectory(directory); |
| } catch (Exception ignored) {} |
| } |
| |
| /** |
| * Removes all files contained within a directory, without removing the directory itself. |
| * |
| * <p>This method is safe against other concurrent deletion attempts. |
| * |
| * @param directory The directory to remove all files from. |
| * |
| * @throws FileNotFoundException Thrown if the directory itself does not exist. |
| * @throws IOException Thrown if the file indicates a proper file and not a directory, or if |
| * the directory could not be cleaned for some reason, for example |
| * due to missing access/write permissions. |
| */ |
| public static void cleanDirectory(File directory) throws IOException { |
| checkNotNull(directory, "directory"); |
| |
| guardIfWindows(FileUtils::cleanDirectoryInternal, directory); |
| } |
| |
| private static void deleteFileOrDirectoryInternal(File file) throws IOException { |
| if (file.isDirectory()) { |
| // file exists and is directory |
| deleteDirectoryInternal(file); |
| } |
| else { |
| // if the file is already gone (concurrently), we don't mind |
| Files.deleteIfExists(file.toPath()); |
| } |
| // else: already deleted |
| } |
| |
| private static void deleteDirectoryInternal(File directory) throws IOException { |
| if (directory.isDirectory()) { |
| // directory exists and is a directory |
| |
| // empty the directory first |
| try { |
| cleanDirectoryInternal(directory); |
| } |
| catch (FileNotFoundException ignored) { |
| // someone concurrently deleted the directory, nothing to do for us |
| return; |
| } |
| |
| // delete the directory. this fails if the directory is not empty, meaning |
| // if new files got concurrently created. we want to fail then. |
| // if someone else deleted the empty directory concurrently, we don't mind |
| // the result is the same for us, after all |
| Files.deleteIfExists(directory.toPath()); |
| } |
| else if (directory.exists()) { |
| // exists but is file, not directory |
| // either an error from the caller, or concurrently a file got created |
| throw new IOException(directory + " is not a directory"); |
| } |
| // else: does not exist, which is okay (as if deleted) |
| } |
| |
| private static void cleanDirectoryInternal(File directory) throws IOException { |
| if (directory.isDirectory()) { |
| final File[] files = directory.listFiles(); |
| |
| if (files == null) { |
| // directory does not exist any more or no permissions |
| if (directory.exists()) { |
| throw new IOException("Failed to list contents of " + directory); |
| } else { |
| throw new FileNotFoundException(directory.toString()); |
| } |
| } |
| |
| // remove all files in the directory |
| for (File file : files) { |
| if (file != null) { |
| deleteFileOrDirectory(file); |
| } |
| } |
| } |
| else if (directory.exists()) { |
| throw new IOException(directory + " is not a directory but a regular file"); |
| } |
| else { |
| // else does not exist at all |
| throw new FileNotFoundException(directory.toString()); |
| } |
| } |
| |
| private static void guardIfWindows(ThrowingConsumer<File, IOException> toRun, File file) throws IOException { |
| if (!OperatingSystem.isWindows()) { |
| toRun.accept(file); |
| } |
| else { |
| // for windows, we synchronize on a global lock, to prevent concurrent delete issues |
| // > |
| // in the future, we may want to find either a good way of working around file visibility |
| // in Windows under concurrent operations (the behavior seems completely unpredictable) |
| // or make this locking more fine grained, for example on directory path prefixes |
| synchronized (WINDOWS_DELETE_LOCK) { |
| for (int attempt = 1; attempt <= 10; attempt++) { |
| try { |
| toRun.accept(file); |
| break; |
| } |
| catch (AccessDeniedException e) { |
| // ah, windows... |
| } |
| |
| // briefly wait and fall through the loop |
| try { |
| Thread.sleep(1); |
| } catch (InterruptedException e) { |
| // restore the interruption flag and error out of the method |
| Thread.currentThread().interrupt(); |
| throw new IOException("operation interrupted"); |
| } |
| } |
| } |
| } |
| } |
| |
| // ------------------------------------------------------------------------ |
| // Deleting directories on Flink FileSystem abstraction |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * Deletes the path if it is empty. A path can only be empty if it is a directory which does |
| * not contain any other directories/files. |
| * |
| * @param fileSystem to use |
| * @param path to be deleted if empty |
| * @return true if the path could be deleted; otherwise false |
| * @throws IOException if the delete operation fails |
| */ |
| public static boolean deletePathIfEmpty(FileSystem fileSystem, Path path) throws IOException { |
| final FileStatus[] fileStatuses; |
| |
| try { |
| fileStatuses = fileSystem.listStatus(path); |
| } |
| catch (FileNotFoundException e) { |
| // path already deleted |
| return true; |
| } |
| catch (Exception e) { |
| // could not access directory, cannot delete |
| return false; |
| } |
| |
| // if there are no more files or if we couldn't list the file status try to delete the path |
| if (fileStatuses == null) { |
| // another indicator of "file not found" |
| return true; |
| } |
| else if (fileStatuses.length == 0) { |
| // attempt to delete the path (will fail and be ignored if the path now contains |
| // some files (possibly added concurrently)) |
| return fileSystem.delete(path, false); |
| } |
| else { |
| return false; |
| } |
| } |
| |
| /** |
| * Copies all files from source to target and sets executable flag. Paths might be on different systems. |
| * @param sourcePath source path to copy from |
| * @param targetPath target path to copy to |
| * @param executable if target file should be executable |
| * @throws IOException if the copy fails |
| */ |
| public static void copy(Path sourcePath, Path targetPath, boolean executable) throws IOException { |
| // we unwrap the file system to get raw streams without safety net |
| FileSystem sFS = FileSystem.getUnguardedFileSystem(sourcePath.toUri()); |
| FileSystem tFS = FileSystem.getUnguardedFileSystem(targetPath.toUri()); |
| if (!tFS.exists(targetPath)) { |
| if (sFS.getFileStatus(sourcePath).isDir()) { |
| tFS.mkdirs(targetPath); |
| FileStatus[] contents = sFS.listStatus(sourcePath); |
| for (FileStatus content : contents) { |
| String distPath = content.getPath().toString(); |
| if (content.isDir()) { |
| if (distPath.endsWith("/")) { |
| distPath = distPath.substring(0, distPath.length() - 1); |
| } |
| } |
| String localPath = targetPath.toString() + distPath.substring(distPath.lastIndexOf("/")); |
| copy(content.getPath(), new Path(localPath), executable); |
| } |
| } else { |
| try (FSDataOutputStream lfsOutput = tFS.create(targetPath, FileSystem.WriteMode.NO_OVERWRITE); FSDataInputStream fsInput = sFS.open(sourcePath)) { |
| IOUtils.copyBytes(fsInput, lfsOutput); |
| //noinspection ResultOfMethodCallIgnored |
| new File(targetPath.toString()).setExecutable(executable); |
| } catch (IOException ignored) { |
| } |
| } |
| } |
| } |
| |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * Private default constructor to avoid instantiation. |
| */ |
| private FileUtils() {} |
| } |