blob: b5bdcaf59e23b94a24cab5f3c40b885c3e64ac05 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.filecache;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The FileCache is used to create the local files for the registered cache files when a task is deployed.
* The files will be removed when the task is unregistered after a 5 second delay.
* A given file x will be placed in "{@code <system-tmp-dir>/tmp_<jobID>/}".
*/
public class FileCache {
static final Logger LOG = LoggerFactory.getLogger(FileCache.class);
/** cache-wide lock to ensure consistency. copies are not done under this lock */
private final Object lock = new Object();
private final Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries;
private final ScheduledExecutorService executorService;
private final File[] storageDirectories;
private final Thread shutdownHook;
private int nextDirectory;
// ------------------------------------------------------------------------
public FileCache(Configuration config) throws IOException {
String tempDirs = config.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH);
String[] directories = tempDirs.split(",|" + File.pathSeparator);
storageDirectories = new File[directories.length];
for (int i = 0; i < directories.length; i++) {
String cacheDirName = "flink-dist-cache-" + UUID.randomUUID().toString();
storageDirectories[i] = new File(directories[i], cacheDirName);
String path = storageDirectories[i].getAbsolutePath();
if (storageDirectories[i].mkdirs()) {
LOG.info("User file cache uses directory " + path);
} else {
LOG.error("User file cache cannot create directory " + path);
// delete all other directories we created so far
for (int k = 0; k < i; k++) {
if (!storageDirectories[k].delete()) {
LOG.warn("User file cache cannot remove prior directory " +
storageDirectories[k].getAbsolutePath());
}
}
throw new IOException("File cache cannot create temp storage directory: " + path);
}
}
this.shutdownHook = createShutdownHook(this, LOG);
this.entries = new HashMap<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>>();
this.executorService = Executors.newScheduledThreadPool(10, ExecutorThreadFactory.INSTANCE);
}
/**
* Shuts down the file cache by cancelling all
*/
public void shutdown() {
synchronized (lock) {
// first shutdown the thread pool
ScheduledExecutorService es = this.executorService;
if (es != null) {
es.shutdown();
try {
es.awaitTermination(5000L, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
// may happen
}
}
entries.clear();
// clean up the all storage directories
for (File dir : storageDirectories) {
try {
FileUtils.deleteDirectory(dir);
}
catch (IOException e) {
LOG.error("File cache could not properly clean up storage directory.");
}
}
// Remove shutdown hook to prevent resource leaks, unless this is invoked by the
// shutdown hook itself
if (shutdownHook != null && shutdownHook != Thread.currentThread()) {
try {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
}
catch (IllegalStateException e) {
// race, JVM is in shutdown already, we can safely ignore this
}
catch (Throwable t) {
LOG.warn("Exception while unregistering file cache's cleanup shutdown hook.");
}
}
}
}
// ------------------------------------------------------------------------
/**
* If the file doesn't exists locally, it will copy the file to the temp directory.
*
* @param name The name under which the file is registered.
* @param entry The cache entry descriptor (path, executable flag)
* @param jobID The ID of the job for which the file is copied.
* @return The handle to the task that copies the file.
*/
public Future<Path> createTmpFile(String name, DistributedCacheEntry entry, JobID jobID) {
synchronized (lock) {
Map<String, Tuple4<Integer, File, Path, Future<Path>>> jobEntries = entries.get(jobID);
if (jobEntries == null) {
jobEntries = new HashMap<String, Tuple4<Integer, File, Path, Future<Path>>>();
entries.put(jobID, jobEntries);
}
// tuple is (ref-count, parent-temp-dir, cached-file-path, copy-process)
Tuple4<Integer, File, Path, Future<Path>> fileEntry = jobEntries.get(name);
if (fileEntry != null) {
// file is already in the cache. return a future that
// immediately returns the file
fileEntry.f0 = fileEntry.f0 + 1;
// return the future. may be that the copy is still in progress
return fileEntry.f3;
}
else {
// need to copy the file
// create the target path
File tempDirToUse = new File(storageDirectories[nextDirectory++], jobID.toString());
if (nextDirectory >= storageDirectories.length) {
nextDirectory = 0;
}
String sourceFile = entry.filePath;
int posOfSep = sourceFile.lastIndexOf("/");
if (posOfSep > 0) {
sourceFile = sourceFile.substring(posOfSep + 1);
}
Path target = new Path(tempDirToUse.getAbsolutePath() + "/" + sourceFile);
// kick off the copying
CopyProcess cp = new CopyProcess(entry, target);
FutureTask<Path> copyTask = new FutureTask<Path>(cp);
executorService.submit(copyTask);
// store our entry
jobEntries.put(name, new Tuple4<Integer, File, Path, Future<Path>>(1, tempDirToUse, target, copyTask));
return copyTask;
}
}
}
/**
* Deletes the local file after a 5 second delay.
*
* @param name The name under which the file is registered.
* @param jobID The ID of the job for which the file is copied.
*/
public void deleteTmpFile(String name, JobID jobID) {
DeleteProcess dp = new DeleteProcess(lock, entries, name, jobID);
executorService.schedule(dp, 5000L, TimeUnit.MILLISECONDS);
}
boolean holdsStillReference(String name, JobID jobId) {
Map<String, Tuple4<Integer, File, Path, Future<Path>>> jobEntries = entries.get(jobId);
if (jobEntries != null) {
Tuple4<Integer, File, Path, Future<Path>> entry = jobEntries.get(name);
return entry != null && entry.f0 > 0;
}
else {
return false;
}
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
public static void copy(Path sourcePath, Path targetPath, boolean executable) throws IOException {
FileSystem sFS = sourcePath.getFileSystem();
FileSystem tFS = targetPath.getFileSystem();
if (!tFS.exists(targetPath)) {
if (sFS.getFileStatus(sourcePath).isDir()) {
tFS.mkdirs(targetPath);
FileStatus[] contents = sFS.listStatus(sourcePath);
for (FileStatus content : contents) {
String distPath = content.getPath().toString();
if (content.isDir()) {
if (distPath.endsWith("/")) {
distPath = distPath.substring(0, distPath.length() - 1);
}
}
String localPath = targetPath.toString() + distPath.substring(distPath.lastIndexOf("/"));
copy(content.getPath(), new Path(localPath), executable);
}
} else {
try {
FSDataOutputStream lfsOutput = tFS.create(targetPath, false);
FSDataInputStream fsInput = sFS.open(sourcePath);
IOUtils.copyBytes(fsInput, lfsOutput);
//noinspection ResultOfMethodCallIgnored
new File(targetPath.toString()).setExecutable(executable);
// closing the FSDataOutputStream
lfsOutput.close();
}
catch (IOException ioe) {
LOG.error("could not copy file to local file cache.", ioe);
}
}
}
}
private static Thread createShutdownHook(final FileCache cache, final Logger logger) {
Thread shutdownHook = new Thread(new Runnable() {
@Override
public void run() {
try {
cache.shutdown();
}
catch (Throwable t) {
logger.error("Error during shutdown of file cache via JVM shutdown hook: " + t.getMessage(), t);
}
}
});
try {
// Add JVM shutdown hook to call shutdown of service
Runtime.getRuntime().addShutdownHook(shutdownHook);
return shutdownHook;
}
catch (IllegalStateException e) {
// JVM is already shutting down. no need to do our work
return null;
}
catch (Throwable t) {
logger.error("Cannot register shutdown hook that cleanly terminates the file cache service.");
return null;
}
}
// ------------------------------------------------------------------------
// background processes
// ------------------------------------------------------------------------
/**
* Asynchronous file copy process
*/
private static class CopyProcess implements Callable<Path> {
private final Path filePath;
private final Path cachedPath;
private boolean executable;
public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
this.filePath = new Path(e.filePath);
this.executable = e.isExecutable;
this.cachedPath = cachedPath;
}
@Override
public Path call() throws IOException {
// let exceptions propagate. we can retrieve them later from
// the future and report them upon access to the result
copy(filePath, cachedPath, this.executable);
return cachedPath;
}
}
/**
* If no task is using this file after 5 seconds, clear it.
*/
private static class DeleteProcess implements Runnable {
private final Object lock;
private final Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries;
private final String name;
private final JobID jobID;
public DeleteProcess(Object lock, Map<JobID, Map<String, Tuple4<Integer, File, Path, Future<Path>>>> entries,
String name, JobID jobID)
{
this.lock = lock;
this.entries = entries;
this.name = name;
this.jobID = jobID;
}
@Override
public void run() {
try {
synchronized (lock) {
Map<String, Tuple4<Integer, File, Path, Future<Path>>> jobEntries = entries.get(jobID);
if (jobEntries != null) {
Tuple4<Integer, File, Path, Future<Path>> entry = jobEntries.get(name);
if (entry != null) {
int count = entry.f0;
if (count > 1) {
// multiple references still
entry.f0 = count - 1;
}
else {
// we remove the last reference
jobEntries.remove(name);
if (jobEntries.isEmpty()) {
entries.remove(jobID);
}
// abort the copy
entry.f3.cancel(true);
// remove the file
File file = new File(entry.f2.toString());
if (file.exists()) {
if (file.isDirectory()) {
FileUtils.deleteDirectory(file);
}
else if (!file.delete()) {
LOG.error("Could not delete locally cached file " + file.getAbsolutePath());
}
}
// remove the job wide temp directory, if it is now empty
File parent = entry.f1;
if (parent.isDirectory()) {
String[] children = parent.list();
if (children == null || children.length == 0) {
//noinspection ResultOfMethodCallIgnored
parent.delete();
}
}
}
}
}
}
}
catch (IOException e) {
LOG.error("Could not delete file from local file cache.", e);
}
}
}
}