HADOOP-17313. FileSystem.get to support slow-to-instantiate FS clients. (#2396)


This adds a semaphore to throttle the number of FileSystem instances which
can be created simultaneously, set in "fs.creation.parallel.count".

This is designed to reduce the impact of many threads in an application calling
FileSystem.get() on a filesystem which takes time to instantiate -for example
to an object where HTTPS connections are set up during initialization.
Many threads trying to do this may create spurious delays by conflicting
for access to synchronized blocks, when simply limiting the parallelism
diminishes the conflict, so speeds up all threads trying to access
the store.

The default value, 64, is larger than is likely to deliver any speedup -but
it does mean that there should be no adverse effects from the change.

If a service appears to be blocking on all threads initializing connections to
abfs, s3a or store, try a smaller (possibly significantly smaller) value.

Contributed by Steve Loughran.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
index 7556831..57446d3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
@@ -166,6 +166,27 @@
   public static final String  FS_AUTOMATIC_CLOSE_KEY = "fs.automatic.close";
   /** Default value for FS_AUTOMATIC_CLOSE_KEY */
   public static final boolean FS_AUTOMATIC_CLOSE_DEFAULT = true;
+
+  /**
+   * Number of filesystems instances can be created in parallel.
+   * <p></p>
+   * A higher number here does not necessarily improve performance, especially
+   * for object stores, where multiple threads may be attempting to create an FS
+   * instance for the same URI.
+   * <p></p>
+   * Default value: {@value}.
+   */
+  public static final String FS_CREATION_PARALLEL_COUNT =
+      "fs.creation.parallel.count";
+
+  /**
+   * Default value for {@link #FS_CREATION_PARALLEL_COUNT}.
+   * <p></p>
+   * Default value: {@value}.
+   */
+  public static final int FS_CREATION_PARALLEL_COUNT_DEFAULT =
+      64;
+
   /**
    * @see
    * <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 9fffb8a..e814b3d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -21,6 +21,7 @@
 import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.lang.ref.WeakReference;
 import java.lang.ref.ReferenceQueue;
 import java.net.URI;
@@ -44,6 +45,7 @@
 import java.util.Stack;
 import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
@@ -75,6 +77,7 @@
 import org.apache.hadoop.security.token.DelegationTokenIssuer;
 import org.apache.hadoop.util.ClassUtil;
 import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.DurationInfo;
 import org.apache.hadoop.util.LambdaUtils;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -200,7 +203,7 @@
   public static final String USER_HOME_PREFIX = "/user";
 
   /** FileSystem cache. */
-  static final Cache CACHE = new Cache();
+  static final Cache CACHE = new Cache(new Configuration());
 
   /** The key this instance is stored under in the cache. */
   private Cache.Key key;
@@ -2591,8 +2594,11 @@
         + "; Object Identity Hash: "
         + Integer.toHexString(System.identityHashCode(this)));
     // delete all files that were marked as delete-on-exit.
-    processDeleteOnExit();
-    CACHE.remove(this.key, this);
+    try {
+      processDeleteOnExit();
+    } finally {
+      CACHE.remove(this.key, this);
+    }
   }
 
   /**
@@ -3453,7 +3459,9 @@
   private static FileSystem createFileSystem(URI uri, Configuration conf)
       throws IOException {
     Tracer tracer = FsTracer.get(conf);
-    try(TraceScope scope = tracer.newScope("FileSystem#createFileSystem")) {
+    try(TraceScope scope = tracer.newScope("FileSystem#createFileSystem");
+        DurationInfo ignored =
+            new DurationInfo(LOGGER, false, "Creating FS %s", uri)) {
       scope.addKVAnnotation("scheme", uri.getScheme());
       Class<? extends FileSystem> clazz =
           getFileSystemClass(uri.getScheme(), conf);
@@ -3476,15 +3484,39 @@
   }
 
   /** Caching FileSystem objects. */
-  static class Cache {
+  static final class Cache {
     private final ClientFinalizer clientFinalizer = new ClientFinalizer();
 
     private final Map<Key, FileSystem> map = new HashMap<>();
     private final Set<Key> toAutoClose = new HashSet<>();
 
+    /** Semaphore used to serialize creation of new FS instances. */
+    private final Semaphore creatorPermits;
+
+    /**
+     * Counter of the number of discarded filesystem instances
+     * in this cache. Primarily for testing, but it could possibly
+     * be made visible as some kind of metric.
+     */
+    private final AtomicLong discardedInstances = new AtomicLong(0);
+
     /** A variable that makes all objects in the cache unique. */
     private static AtomicLong unique = new AtomicLong(1);
 
+    /**
+     * Instantiate. The configuration is used to read the
+     * count of permits issued for concurrent creation
+     * of filesystem instances.
+     * @param conf configuration
+     */
+    Cache(final Configuration conf) {
+      int permits = conf.getInt(FS_CREATION_PARALLEL_COUNT,
+          FS_CREATION_PARALLEL_COUNT_DEFAULT);
+      checkArgument(permits > 0, "Invalid value of %s: %s",
+          FS_CREATION_PARALLEL_COUNT, permits);
+      creatorPermits = new Semaphore(permits);
+    }
+
     FileSystem get(URI uri, Configuration conf) throws IOException{
       Key key = new Key(uri, conf);
       return getInternal(uri, conf, key);
@@ -3518,33 +3550,86 @@
       if (fs != null) {
         return fs;
       }
-
-      fs = createFileSystem(uri, conf);
-      final long timeout = conf.getTimeDuration(SERVICE_SHUTDOWN_TIMEOUT,
-          SERVICE_SHUTDOWN_TIMEOUT_DEFAULT,
-          ShutdownHookManager.TIME_UNIT_DEFAULT);
-      synchronized (this) { // refetch the lock again
-        FileSystem oldfs = map.get(key);
-        if (oldfs != null) { // a file system is created while lock is releasing
-          fs.close(); // close the new file system
-          return oldfs;  // return the old file system
-        }
-
-        // now insert the new file system into the map
-        if (map.isEmpty()
-                && !ShutdownHookManager.get().isShutdownInProgress()) {
-          ShutdownHookManager.get().addShutdownHook(clientFinalizer,
-              SHUTDOWN_HOOK_PRIORITY, timeout,
-              ShutdownHookManager.TIME_UNIT_DEFAULT);
-        }
-        fs.key = key;
-        map.put(key, fs);
-        if (conf.getBoolean(
-            FS_AUTOMATIC_CLOSE_KEY, FS_AUTOMATIC_CLOSE_DEFAULT)) {
-          toAutoClose.add(key);
-        }
-        return fs;
+      // fs not yet created, acquire lock
+      // to construct an instance.
+      try (DurationInfo d = new DurationInfo(LOGGER, false,
+          "Acquiring creator semaphore for %s", uri)) {
+        creatorPermits.acquire();
+      } catch (InterruptedException e) {
+        // acquisition was interrupted; convert to an IOE.
+        throw (IOException)new InterruptedIOException(e.toString())
+            .initCause(e);
       }
+      FileSystem fsToClose = null;
+      try {
+        // See if FS was instantiated by another thread while waiting
+        // for the permit.
+        synchronized (this) {
+          fs = map.get(key);
+        }
+        if (fs != null) {
+          LOGGER.debug("Filesystem {} created while awaiting semaphore", uri);
+          return fs;
+        }
+        // create the filesystem
+        fs = createFileSystem(uri, conf);
+        final long timeout = conf.getTimeDuration(SERVICE_SHUTDOWN_TIMEOUT,
+            SERVICE_SHUTDOWN_TIMEOUT_DEFAULT,
+            ShutdownHookManager.TIME_UNIT_DEFAULT);
+        // any FS to close outside of the synchronized section
+        synchronized (this) { // lock on the Cache object
+
+          // see if there is now an entry for the FS, which happens
+          // if another thread's creation overlapped with this one.
+          FileSystem oldfs = map.get(key);
+          if (oldfs != null) {
+            // a file system was created in a separate thread.
+            // save the FS reference to close outside all locks,
+            // and switch to returning the oldFS
+            fsToClose = fs;
+            fs = oldfs;
+          } else {
+            // register the clientFinalizer if needed and shutdown isn't
+            // already active
+            if (map.isEmpty()
+                && !ShutdownHookManager.get().isShutdownInProgress()) {
+              ShutdownHookManager.get().addShutdownHook(clientFinalizer,
+                  SHUTDOWN_HOOK_PRIORITY, timeout,
+                  ShutdownHookManager.TIME_UNIT_DEFAULT);
+            }
+            // insert the new file system into the map
+            fs.key = key;
+            map.put(key, fs);
+            if (conf.getBoolean(
+                FS_AUTOMATIC_CLOSE_KEY, FS_AUTOMATIC_CLOSE_DEFAULT)) {
+              toAutoClose.add(key);
+            }
+          }
+        } // end of synchronized block
+      } finally {
+        // release the creator permit.
+        creatorPermits.release();
+      }
+      if (fsToClose != null) {
+        LOGGER.debug("Duplicate FS created for {}; discarding {}",
+            uri, fs);
+        discardedInstances.incrementAndGet();
+        // close the new file system
+        // note this will briefly remove and reinstate "fsToClose" from
+        // the map. It is done in a synchronized block so will not be
+        // visible to others.
+        IOUtils.cleanupWithLogger(LOGGER, fsToClose);
+      }
+      return fs;
+    }
+
+    /**
+     * Get the count of discarded instances.
+     * @return the new instance.
+     */
+    @VisibleForTesting
+    long getDiscardedInstances() {
+      return discardedInstances.get();
     }
 
     synchronized void remove(Key key, FileSystem fs) {
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemCaching.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemCaching.java
index b3c3847..01abeaa 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemCaching.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFileSystemCaching.java
@@ -21,22 +21,30 @@
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.test.HadoopTestBase;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
 
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
-import java.security.PrivilegedExceptionAction;
-import java.util.concurrent.Semaphore;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_CREATION_PARALLEL_COUNT;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
-import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
 
-public class TestFileSystemCaching {
+public class TestFileSystemCaching extends HadoopTestBase {
 
   @Test
   public void testCacheEnabled() throws Exception {
@@ -336,4 +344,133 @@
     assertNotEquals(keyA, new FileSystem.Cache.Key(
         new URI("wasb://a:password@account.blob.core.windows.net"), conf));
   }
+
+
+  /**
+   * Single semaphore: no surplus FS instances will be created
+   * and then discarded.
+   */
+  @Test
+  public void testCacheSingleSemaphoredConstruction() throws Exception {
+    FileSystem.Cache cache = semaphoredCache(1);
+    createFileSystems(cache, 10);
+    Assertions.assertThat(cache.getDiscardedInstances())
+        .describedAs("Discarded FS instances")
+        .isEqualTo(0);
+  }
+
+  /**
+   * Dual semaphore: thread 2 will get as far as
+   * blocking in the initialize() method while awaiting
+   * thread 1 to complete its initialization.
+   * <p></p>
+   * The thread 2 FS instance will be discarded.
+   * All other threads will block for a cache semaphore,
+   * so when they are given an opportunity to proceed,
+   * they will find that an FS instance exists.
+   */
+  @Test
+  public void testCacheDualSemaphoreConstruction() throws Exception {
+    FileSystem.Cache cache = semaphoredCache(2);
+    createFileSystems(cache, 10);
+    Assertions.assertThat(cache.getDiscardedInstances())
+        .describedAs("Discarded FS instances")
+        .isEqualTo(1);
+  }
+
+  /**
+   * Construct the FS instances in a cache with effectively no
+   * limit on the number of instances which can be created
+   * simultaneously.
+   * <p></p>
+   * This is the effective state before HADOOP-17313.
+   * <p></p>
+   * All but one thread's FS instance will be discarded.
+   */
+  @Test
+  public void testCacheLargeSemaphoreConstruction() throws Exception {
+    FileSystem.Cache cache = semaphoredCache(999);
+    int count = 10;
+    createFileSystems(cache, count);
+    Assertions.assertThat(cache.getDiscardedInstances())
+        .describedAs("Discarded FS instances")
+        .isEqualTo(count -1);
+  }
+
+  /**
+   * Create a cache with a given semaphore size.
+   * @param semaphores number of semaphores
+   * @return the cache.
+   */
+  private FileSystem.Cache semaphoredCache(final int semaphores) {
+    final Configuration conf1 = new Configuration();
+    conf1.setInt(FS_CREATION_PARALLEL_COUNT, semaphores);
+    FileSystem.Cache cache = new FileSystem.Cache(conf1);
+    return cache;
+  }
+
+  /**
+   * Attempt to create {@code count} filesystems in parallel,
+   * then assert that they are all equal.
+   * @param cache cache to use
+   * @param count count of filesystems to instantiate
+   */
+  private void createFileSystems(final FileSystem.Cache cache, final int count)
+      throws URISyntaxException, InterruptedException,
+             java.util.concurrent.ExecutionException {
+    final Configuration conf = new Configuration();
+    conf.set("fs.blocking.impl", BlockingInitializer.NAME);
+    // only one instance can be created at a time.
+    URI uri = new URI("blocking://a");
+    ListeningExecutorService pool =
+        BlockingThreadPoolExecutorService.newInstance(count * 2, 0,
+            10, TimeUnit.SECONDS,
+            "creation-threads");
+
+    // submit a set of requests to create an FS instance.
+    // the semaphore will block all but one, and that will block until
+    // it is allowed to continue
+    List<ListenableFuture<FileSystem>> futures = new ArrayList<>(count);
+
+    // acquire the semaphore so blocking all FS instances from
+    // being initialized.
+    Semaphore semaphore = BlockingInitializer.SEM;
+    semaphore.acquire();
+
+    for (int i = 0; i < count; i++) {
+      futures.add(pool.submit(
+          () -> cache.get(uri, conf)));
+    }
+    // now let all blocked initializers free
+    semaphore.release();
+    // get that first FS
+    FileSystem createdFS = futures.get(0).get();
+    // verify all the others are the same instance
+    for (int i = 1; i < count; i++) {
+      FileSystem fs = futures.get(i).get();
+      Assertions.assertThat(fs)
+          .isSameAs(createdFS);
+    }
+  }
+
+  /**
+   * An FS which blocks in initialize() until it can acquire the shared
+   * semaphore (which it then releases).
+   */
+  private static final class BlockingInitializer extends LocalFileSystem {
+
+    private static final String NAME = BlockingInitializer.class.getName();
+
+    private static final Semaphore SEM = new Semaphore(1);
+
+    @Override
+    public void initialize(URI uri, Configuration conf) throws IOException {
+      try {
+        SEM.acquire();
+        SEM.release();
+      } catch (InterruptedException e) {
+        throw new IOException(e.toString(), e);
+      }
+    }
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md
index 68e768d..c8f1c0e 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md
@@ -629,6 +629,8 @@
 
 ## Tuning FileSystem Initialization.
 
+### Disabling bucket existence checks
+
 When an S3A Filesystem instance is created and initialized, the client
 checks if the bucket provided is valid. This can be slow.
 You can ignore bucket validation by configuring `fs.s3a.bucket.probe` as follows:
@@ -642,3 +644,52 @@
 
 Note: if the bucket does not exist, this issue will surface when operations are performed
 on the filesystem; you will see `UnknownStoreException` stack traces.
+
+### Rate limiting parallel FileSystem creation operations
+
+Applications normally ask for filesystems from the shared cache,
+via `FileSystem.get()` or `Path.getFileSystem()`.
+The cache, `FileSystem.CACHE` will, for each user, cachec one instance of a filesystem
+for a given URI.
+All calls to `FileSystem.get` for a cached FS for a URI such
+as `s3a://landsat-pds/` will return that singe single instance.
+
+FileSystem instances are created on-demand for the cache,
+and will be done in each thread which requests an instance.
+This is done outside of any synchronisation block.
+Once a task has an initialized FileSystem instance, it will, in a synchronized block
+add it to the cache.
+If it turns out that the cache now already has an instance for that URI, it will
+revert the cached copy to it, and close the FS instance it has just created.
+
+If a FileSystem takes time to be initialized, and many threads are trying to
+retrieve a FileSystem instance for the same S3 bucket in parallel,
+All but one of the threads will be doing useless work, and may unintentionally
+be creating lock contention on shared objects.
+
+There is an option, `fs.creation.parallel.count`, which uses a semaphore
+to limit the number of FS instances which may be created in parallel.
+
+Setting this to a low number will reduce the amount of wasted work,
+at the expense of limiting the number of FileSystem clients which
+can be created simultaneously for different object stores/distributed
+filesystems.
+
+For example, a value of four would put an upper limit on the number
+of wasted instantiations of a connector for the `s3a://landsat-pds/`
+bucket.
+
+```xml
+<property>
+  <name>fs.creation.parallel.count</name>
+  <value>4</value>
+</property>
+```
+
+It would also mean that if four threads were in the process
+of creating such connectors, all threads trying to create
+connectors for other buckets, would end up blocking too.
+
+Consider experimenting with this when running applications
+where many threads may try to simultaneously interact
+with the same slow-to-initialize object stores.
\ No newline at end of file