blob: d72aec0343959734a8ade133d579b7cbab5ed842 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.core.fs;
import org.apache.flink.annotation.Internal;
import org.apache.flink.util.IOUtils;
import java.net.URI;
import static org.apache.flink.util.Preconditions.checkState;
/**
* The FileSystemSafetyNet can be used to guard a thread against {@link FileSystem} stream resource leaks.
* When activated for a thread, it tracks all streams that are opened by FileSystems that the thread
* obtains. The safety net has a global cleanup hook that will close all streams that were
* not properly closed.
*
* <p>The main thread of each Flink task, as well as the checkpointing thread are automatically guarded
* by this safety net.
*
* <p><b>Important:</b> This safety net works only for streams created by Flink's FileSystem abstraction,
* i.e., for {@code FileSystem} instances obtained via {@link FileSystem#get(URI)} or through
* {@link Path#getFileSystem()}.
*
* <p><b>Important:</b> When a guarded thread obtains a {@code FileSystem} or a stream and passes them
* to another thread, the safety net will close those resources once the former thread finishes.
*
* <p>The safety net can be used as follows:
* <pre>{@code
*
* class GuardedThread extends Thread {
*
* public void run() {
* FileSystemSafetyNet.initializeSafetyNetForThread();
* try {
* // do some heavy stuff where you are unsure whether it closes all streams
* // like some untrusted user code or library code
* }
* finally {
* FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
* }
* }
* }
* }</pre>
*/
@Internal
public class FileSystemSafetyNet {
/** The map from thread to the safety net registry for that thread. */
private static final ThreadLocal<SafetyNetCloseableRegistry> REGISTRIES = new ThreadLocal<>();
// ------------------------------------------------------------------------
// Activating / Deactivating
// ------------------------------------------------------------------------
/**
* Activates the safety net for a thread. {@link FileSystem} instances obtained by the thread
* that called this method will be guarded, meaning that their created streams are tracked and can
* be closed via the safety net closing hook.
*
* <p>This method should be called at the beginning of a thread that should be guarded.
*
* @throws IllegalStateException Thrown, if a safety net was already registered for the thread.
*/
@Internal
public static void initializeSafetyNetForThread() {
SafetyNetCloseableRegistry oldRegistry = REGISTRIES.get();
checkState(null == oldRegistry, "Found an existing FileSystem safety net for this thread: %s " +
"This may indicate an accidental repeated initialization, or a leak of the" +
"(Inheritable)ThreadLocal through a ThreadPool.", oldRegistry);
SafetyNetCloseableRegistry newRegistry = new SafetyNetCloseableRegistry();
REGISTRIES.set(newRegistry);
}
/**
* Closes the safety net for a thread. This closes all remaining unclosed streams that were opened
* by safety-net-guarded file systems. After this method was called, no streams can be opened any more
* from any FileSystem instance that was obtained while the thread was guarded by the safety net.
*
* <p>This method should be called at the very end of a guarded thread.
*/
@Internal
public static void closeSafetyNetAndGuardedResourcesForThread() {
SafetyNetCloseableRegistry registry = REGISTRIES.get();
if (null != registry) {
REGISTRIES.remove();
IOUtils.closeQuietly(registry);
}
}
/**
* Returns the active safety-net registry for the current thread.
* @deprecated This method should be removed after FLINK-6684 is implemented.
*/
@Deprecated
@Internal
public static SafetyNetCloseableRegistry getSafetyNetCloseableRegistryForThread() {
return REGISTRIES.get();
}
/**
* Sets the active safety-net registry for the current thread.
*/
@Internal
public static void setSafetyNetCloseableRegistryForThread(SafetyNetCloseableRegistry registry) {
REGISTRIES.set(registry);
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
static FileSystem wrapWithSafetyNetWhenActivated(FileSystem fs) {
SafetyNetCloseableRegistry reg = REGISTRIES.get();
return reg != null ? new SafetyNetWrapperFileSystem(fs, reg) : fs;
}
}