HADOOP-17461. Collect thread-level IOStatistics. (#4352)


This adds a thread-level collector of IOStatistics, IOStatisticsContext,
which can be:
* Retrieved for a thread and cached for access from other
  threads.
* reset() to record new statistics.
* Queried for live statistics through the
  IOStatisticsSource.getIOStatistics() method.
* Queries for a statistics aggregator for use in instrumented
  classes.
* Asked to create a serializable copy in snapshot()

The goal is to make it possible for applications with multiple
threads performing different work items simultaneously
to be able to collect statistics on the individual threads,
and so generate aggregate reports on the total work performed
for a specific job, query or similar unit of work.

Some changes in IOStatistics-gathering classes are needed for 
this feature
* Caching the active context's aggregator in the object's
  constructor
* Updating it in close()

Slightly more work is needed in multithreaded code,
such as the S3A committers, which collect statistics across
all threads used in task and job commit operations.

Currently the IOStatisticsContext-aware classes are:
* The S3A input stream, output stream and list iterators.
* RawLocalFileSystem's input and output streams.
* The S3A committers.
* The TaskPool class in hadoop-common, which propagates
  the active context into scheduled worker threads.

Collection of statistics in the IOStatisticsContext
is disabled process-wide by default until the feature 
is considered stable.

To enable the collection, set the option
fs.thread.level.iostatistics.enabled
to "true" in core-site.xml;
	
Contributed by Mehakmeet Singh and Steve Loughran
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 1614420..7c54b32 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -475,4 +475,18 @@
    * default hadoop temp dir on local system: {@value}.
    */
   public static final String HADOOP_TMP_DIR = "hadoop.tmp.dir";
+
+  /**
+   * Thread-level IOStats Support.
+   * {@value}
+   */
+  public static final String THREAD_LEVEL_IOSTATISTICS_ENABLED =
+      "fs.thread.level.iostatistics.enabled";
+
+  /**
+   * Default value for Thread-level IOStats Support is true.
+   */
+  public static final boolean THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT =
+      true;
+
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
index f525c3c..d9ceab9 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
@@ -57,6 +57,8 @@
 import org.apache.hadoop.fs.impl.StoreImplementationUtils;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
 import org.apache.hadoop.fs.statistics.IOStatisticsSource;
 import org.apache.hadoop.fs.statistics.BufferedIOStatisticsOutputStream;
 import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
@@ -156,11 +158,19 @@
     /** Reference to the bytes read counter for slightly faster counting. */
     private final AtomicLong bytesRead;
 
+    /**
+     * Thread level IOStatistics aggregator to update in close().
+     */
+    private final IOStatisticsAggregator
+        ioStatisticsAggregator;
+
     public LocalFSFileInputStream(Path f) throws IOException {
       name = pathToFile(f);
       fis = new FileInputStream(name);
       bytesRead = ioStatistics.getCounterReference(
           STREAM_READ_BYTES);
+      ioStatisticsAggregator =
+          IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator();
     }
     
     @Override
@@ -193,9 +203,13 @@
 
     @Override
     public void close() throws IOException {
-      fis.close();
-      if (asyncChannel != null) {
-        asyncChannel.close();
+      try {
+        fis.close();
+        if (asyncChannel != null) {
+          asyncChannel.close();
+        }
+      } finally {
+        ioStatisticsAggregator.aggregate(ioStatistics);
       }
     }
 
@@ -278,6 +292,7 @@
       // new capabilities.
       switch (capability.toLowerCase(Locale.ENGLISH)) {
       case StreamCapabilities.IOSTATISTICS:
+      case StreamCapabilities.IOSTATISTICS_CONTEXT:
       case StreamCapabilities.VECTOREDIO:
         return true;
       default:
@@ -407,9 +422,19 @@
             STREAM_WRITE_EXCEPTIONS)
         .build();
 
+    /**
+     * Thread level IOStatistics aggregator to update in close().
+     */
+    private final IOStatisticsAggregator
+        ioStatisticsAggregator;
+
     private LocalFSFileOutputStream(Path f, boolean append,
         FsPermission permission) throws IOException {
       File file = pathToFile(f);
+      // store the aggregator before attempting any IO.
+      ioStatisticsAggregator =
+          IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator();
+
       if (!append && permission == null) {
         permission = FsPermission.getFileDefault();
       }
@@ -436,10 +461,17 @@
     }
 
     /*
-     * Just forward to the fos
+     * Close the fos; update the IOStatisticsContext.
      */
     @Override
-    public void close() throws IOException { fos.close(); }
+    public void close() throws IOException {
+      try {
+        fos.close();
+      } finally {
+        ioStatisticsAggregator.aggregate(ioStatistics);
+      }
+    }
+
     @Override
     public void flush() throws IOException { fos.flush(); }
     @Override
@@ -485,6 +517,7 @@
       // new capabilities.
       switch (capability.toLowerCase(Locale.ENGLISH)) {
       case StreamCapabilities.IOSTATISTICS:
+      case StreamCapabilities.IOSTATISTICS_CONTEXT:
         return true;
       default:
         return StoreImplementationUtils.isProbeForSyncable(capability);
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
index d68ef50..c925e50 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
@@ -94,6 +94,12 @@
   String ABORTABLE_STREAM =  CommonPathCapabilities.ABORTABLE_STREAM;
 
   /**
+   * Streams that support IOStatistics context and capture thread-level
+   * IOStatistics.
+   */
+  String IOSTATISTICS_CONTEXT = "fs.capability.iocontext.supported";
+
+  /**
    * Capabilities that a stream can support and be queried for.
    */
   @Deprecated
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java
index b24bef2..16fe0da 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.impl;
 
+import java.lang.ref.WeakReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import javax.annotation.Nullable;
@@ -48,7 +49,17 @@
   }
 
   public V setForCurrentThread(V newVal) {
-    return put(currentThreadId(), newVal);
+    long id = currentThreadId();
+
+    // if the same object is already in the map, just return it.
+    WeakReference<V> ref = lookup(id);
+    // Reference value could be set to null. Thus, ref.get() could return
+    // null. Should be handled accordingly while using the returned value.
+    if (ref != null && ref.get() == newVal) {
+      return ref.get();
+    }
+
+    return put(id, newVal);
   }
 
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsContext.java
new file mode 100644
index 0000000..fb10b93
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsContext.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics;
+
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration;
+
+/**
+ * An interface defined to capture thread-level IOStatistics by using per
+ * thread context.
+ * <p>
+ * The aggregator should be collected in their constructor by statistics-generating
+ * classes to obtain the aggregator to update <i>across all threads</i>.
+ * <p>
+ * The {@link #snapshot()} call creates a snapshot of the statistics;
+ * <p>
+ * The {@link #reset()} call resets the statistics in the context so
+ * that later snapshots will get the incremental data.
+ */
+public interface IOStatisticsContext extends IOStatisticsSource {
+
+  /**
+   * Get the IOStatisticsAggregator for the context.
+   *
+   * @return return the aggregator for the context.
+   */
+  IOStatisticsAggregator getAggregator();
+
+  /**
+   * Capture the snapshot of the context's IOStatistics.
+   *
+   * @return IOStatisticsSnapshot for the context.
+   */
+  IOStatisticsSnapshot snapshot();
+
+  /**
+   * Get a unique ID for this context, for logging
+   * purposes.
+   *
+   * @return an ID unique for all contexts in this process.
+   */
+  long getID();
+
+  /**
+   * Reset the context's IOStatistics.
+   */
+  void reset();
+
+  /**
+   * Get the context's IOStatisticsContext.
+   *
+   * @return instance of IOStatisticsContext for the context.
+   */
+  static IOStatisticsContext getCurrentIOStatisticsContext() {
+    return IOStatisticsContextIntegration.getCurrentIOStatisticsContext();
+  }
+
+  /**
+   * Set the IOStatisticsContext for the current thread.
+   * @param statisticsContext IOStatistics context instance for the
+   * current thread. If null, the context is reset.
+   */
+  static void setThreadIOStatisticsContext(
+      IOStatisticsContext statisticsContext) {
+    IOStatisticsContextIntegration.setThreadIOStatisticsContext(
+        statisticsContext);
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EmptyIOStatisticsContextImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EmptyIOStatisticsContextImpl.java
new file mode 100644
index 0000000..b672f66
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/EmptyIOStatisticsContextImpl.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+
+/**
+ * Empty IOStatistics context which serves no-op for all the operations and
+ * returns an empty Snapshot if asked.
+ *
+ */
+final class EmptyIOStatisticsContextImpl implements IOStatisticsContext {
+
+  private static final IOStatisticsContext EMPTY_CONTEXT = new EmptyIOStatisticsContextImpl();
+
+  private EmptyIOStatisticsContextImpl() {
+  }
+
+  /**
+   * Create a new empty snapshot.
+   * A new one is always created for isolation.
+   *
+   * @return a statistics snapshot
+   */
+  @Override
+  public IOStatisticsSnapshot snapshot() {
+    return new IOStatisticsSnapshot();
+  }
+
+  @Override
+  public IOStatisticsAggregator getAggregator() {
+    return EmptyIOStatisticsStore.getInstance();
+  }
+
+  @Override
+  public IOStatistics getIOStatistics() {
+    return EmptyIOStatistics.getInstance();
+  }
+
+  @Override
+  public void reset() {}
+
+  /**
+   * The ID is always 0.
+   * As the real context implementation counter starts at 1,
+   * we are guaranteed to have unique IDs even between them and
+   * the empty context.
+   * @return 0
+   */
+  @Override
+  public long getID() {
+    return 0;
+  }
+
+  /**
+   * Get the single instance.
+   * @return an instance.
+   */
+  static IOStatisticsContext getInstance() {
+    return EMPTY_CONTEXT;
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextImpl.java
new file mode 100644
index 0000000..97a8528
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextImpl.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+
+/**
+ * Implementing the IOStatisticsContext.
+ *
+ * A Context defined for IOStatistics collection per thread which captures
+ * each worker thread's work in FS streams and stores it in the form of
+ * IOStatisticsSnapshot.
+ *
+ * For the current thread the IOStatisticsSnapshot can be used as a way to
+ * move the IOStatistics data between applications using the Serializable
+ * nature of the class.
+ */
+public final class IOStatisticsContextImpl implements IOStatisticsContext {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(IOStatisticsContextImpl.class);
+
+  /**
+   * Thread ID.
+   */
+  private final long threadId;
+
+  /**
+   * Unique ID.
+   */
+  private final long id;
+
+  /**
+   * IOStatistics to aggregate.
+   */
+  private final IOStatisticsSnapshot ioStatistics = new IOStatisticsSnapshot();
+
+  /**
+   * Constructor.
+   * @param threadId thread ID
+   * @param id instance ID.
+   */
+  public IOStatisticsContextImpl(final long threadId, final long id) {
+    this.threadId = threadId;
+    this.id = id;
+  }
+
+  @Override
+  public String toString() {
+    return "IOStatisticsContextImpl{" +
+        "id=" + id +
+        ", threadId=" + threadId +
+        ", ioStatistics=" + ioStatistics +
+        '}';
+  }
+
+  /**
+   * Get the IOStatisticsAggregator of the context.
+   * @return the instance of IOStatisticsAggregator for this context.
+   */
+  @Override
+  public IOStatisticsAggregator getAggregator() {
+    return ioStatistics;
+  }
+
+  /**
+   * Returns a snapshot of the current thread's IOStatistics.
+   *
+   * @return IOStatisticsSnapshot of the context.
+   */
+  @Override
+  public IOStatisticsSnapshot snapshot() {
+    LOG.debug("Taking snapshot of IOStatisticsContext id {}", id);
+    return new IOStatisticsSnapshot(ioStatistics);
+  }
+
+  /**
+   * Reset the thread +.
+   */
+  @Override
+  public void reset() {
+    LOG.debug("clearing IOStatisticsContext id {}", id);
+    ioStatistics.clear();
+  }
+
+  @Override
+  public IOStatistics getIOStatistics() {
+    return ioStatistics;
+  }
+
+  /**
+   * ID of this context.
+   * @return ID.
+   */
+  @Override
+  public long getID() {
+    return id;
+  }
+
+  /**
+   * Get the thread ID.
+   * @return thread ID.
+   */
+  public long getThreadID() {
+    return threadId;
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextIntegration.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextIntegration.java
new file mode 100644
index 0000000..483d1e4
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/IOStatisticsContextIntegration.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import java.lang.ref.WeakReference;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.impl.WeakReferenceThreadMap;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT;
+
+/**
+ * A Utility class for IOStatisticsContext, which helps in creating and
+ * getting the current active context. Static methods in this class allows to
+ * get the current context to start aggregating the IOStatistics.
+ *
+ * Static initializer is used to work out if the feature to collect
+ * thread-level IOStatistics is enabled or not and the corresponding
+ * implementation class is called for it.
+ *
+ * Weak Reference thread map to be used to keep track of different context's
+ * to avoid long-lived memory leakages as these references would be cleaned
+ * up at GC.
+ */
+public final class IOStatisticsContextIntegration {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(IOStatisticsContextIntegration.class);
+
+  /**
+   * Is thread-level IO Statistics enabled?
+   */
+  private static boolean isThreadIOStatsEnabled;
+
+  /**
+   * ID for next instance to create.
+   */
+  public static final AtomicLong INSTANCE_ID = new AtomicLong(1);
+
+  /**
+   * Active IOStatistics Context containing different worker thread's
+   * statistics. Weak Reference so that it gets cleaned up during GC and we
+   * avoid any memory leak issues due to long lived references.
+   */
+  private static final WeakReferenceThreadMap<IOStatisticsContext>
+      ACTIVE_IOSTATS_CONTEXT =
+      new WeakReferenceThreadMap<>(
+          IOStatisticsContextIntegration::createNewInstance,
+          IOStatisticsContextIntegration::referenceLostContext
+      );
+
+  static {
+    // Work out if the current context has thread level IOStatistics enabled.
+    final Configuration configuration = new Configuration();
+    isThreadIOStatsEnabled =
+        configuration.getBoolean(THREAD_LEVEL_IOSTATISTICS_ENABLED,
+            THREAD_LEVEL_IOSTATISTICS_ENABLED_DEFAULT);
+  }
+
+  /**
+   * Private constructor for a utility class to be used in IOStatisticsContext.
+   */
+  private IOStatisticsContextIntegration() {}
+
+  /**
+   * Creating a new IOStatisticsContext instance for a FS to be used.
+   * @param key Thread ID that represents which thread the context belongs to.
+   * @return an instance of IOStatisticsContext.
+   */
+  private static IOStatisticsContext createNewInstance(Long key) {
+    return new IOStatisticsContextImpl(key, INSTANCE_ID.getAndIncrement());
+  }
+
+  /**
+   * In case of reference loss for IOStatisticsContext.
+   * @param key ThreadID.
+   */
+  private static void referenceLostContext(Long key) {
+    LOG.debug("Reference lost for threadID for the context: {}", key);
+  }
+
+  /**
+   * Get the current thread's IOStatisticsContext instance. If no instance is
+   * present for this thread ID, create one using the factory.
+   * @return instance of IOStatisticsContext.
+   */
+  public static IOStatisticsContext getCurrentIOStatisticsContext() {
+    return isThreadIOStatsEnabled
+        ? ACTIVE_IOSTATS_CONTEXT.getForCurrentThread()
+        : EmptyIOStatisticsContextImpl.getInstance();
+  }
+
+  /**
+   * Set the IOStatisticsContext for the current thread.
+   * @param statisticsContext IOStatistics context instance for the
+   * current thread. If null, the context is reset.
+   */
+  public static void setThreadIOStatisticsContext(
+      IOStatisticsContext statisticsContext) {
+    if (isThreadIOStatsEnabled) {
+      if (statisticsContext == null) {
+        ACTIVE_IOSTATS_CONTEXT.removeForCurrentThread();
+      }
+      if (ACTIVE_IOSTATS_CONTEXT.getForCurrentThread() != statisticsContext) {
+        ACTIVE_IOSTATS_CONTEXT.setForCurrentThread(statisticsContext);
+      }
+    }
+  }
+
+  /**
+   * Get thread ID specific IOStatistics values if
+   * statistics are enabled and the thread ID is in the map.
+   * @param testThreadId thread ID.
+   * @return IOStatisticsContext if found in the map.
+   */
+  @VisibleForTesting
+  public static IOStatisticsContext getThreadSpecificIOStatisticsContext(long testThreadId) {
+    LOG.debug("IOStatsContext thread ID required: {}", testThreadId);
+
+    if (!isThreadIOStatsEnabled) {
+      return null;
+    }
+    // lookup the weakRef IOStatisticsContext for the thread ID in the
+    // ThreadMap.
+    WeakReference<IOStatisticsContext> ioStatisticsSnapshotWeakReference =
+        ACTIVE_IOSTATS_CONTEXT.lookup(testThreadId);
+    if (ioStatisticsSnapshotWeakReference != null) {
+      return ioStatisticsSnapshotWeakReference.get();
+    }
+    return null;
+  }
+
+  /**
+   * A method to enable IOStatisticsContext to override if set otherwise in
+   * the configurations for tests.
+   */
+  @VisibleForTesting
+  public static void enableIOStatisticsContext() {
+    if (!isThreadIOStatsEnabled) {
+      LOG.info("Enabling Thread IOStatistics..");
+      isThreadIOStatsEnabled = true;
+    }
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/TaskPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/TaskPool.java
index 0abaab2..c9e6d0b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/TaskPool.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/TaskPool.java
@@ -37,6 +37,7 @@
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
 
 import static java.util.Objects.requireNonNull;
 import static org.apache.hadoop.util.functional.RemoteIterators.remoteIteratorFromIterable;
@@ -137,6 +138,15 @@
     private int sleepInterval = SLEEP_INTERVAL_AWAITING_COMPLETION;
 
     /**
+     * IOStatisticsContext to switch to in all threads
+     * taking part in the commit operation.
+     * This ensures that the IOStatistics collected in the
+     * worker threads will be aggregated into the total statistics
+     * of the thread calling the committer commit/abort methods.
+     */
+    private IOStatisticsContext ioStatisticsContext = null;
+
+    /**
      * Create the builder.
      * @param items items to process
      */
@@ -242,7 +252,7 @@
      * @param value new value
      * @return the builder
      */
-    public Builder sleepInterval(final int value) {
+    public Builder<I> sleepInterval(final int value) {
       sleepInterval = value;
       return this;
     }
@@ -364,6 +374,8 @@
 
     /**
      * Parallel execution.
+     * All tasks run within the same IOStatisticsContext as the
+     * thread calling this method.
      * @param task task to execute
      * @param <E> exception which may be raised in execution.
      * @return true if the operation executed successfully
@@ -379,64 +391,70 @@
       final AtomicBoolean revertFailed = new AtomicBoolean(false);
 
       List<Future<?>> futures = new ArrayList<>();
+      ioStatisticsContext = IOStatisticsContext.getCurrentIOStatisticsContext();
 
       IOException iteratorIOE = null;
       final RemoteIterator<I> iterator = this.items;
       try {
-        while(iterator.hasNext()) {
+        while (iterator.hasNext()) {
           final I item = iterator.next();
           // submit a task for each item that will either run or abort the task
           futures.add(service.submit(() -> {
-            if (!(stopOnFailure && taskFailed.get())) {
-              // run the task
-              boolean threw = true;
-              try {
-                LOG.debug("Executing task");
-                task.run(item);
-                succeeded.add(item);
-                LOG.debug("Task succeeded");
+            setStatisticsContext();
+            try {
+              if (!(stopOnFailure && taskFailed.get())) {
+                // prepare and run the task
+                boolean threw = true;
+                try {
+                  LOG.debug("Executing task");
+                  task.run(item);
+                  succeeded.add(item);
+                  LOG.debug("Task succeeded");
 
-                threw = false;
+                  threw = false;
 
-              } catch (Exception e) {
-                taskFailed.set(true);
-                exceptions.add(e);
-                LOG.info("Task failed {}", e.toString());
-                LOG.debug("Task failed", e);
+                } catch (Exception e) {
+                  taskFailed.set(true);
+                  exceptions.add(e);
+                  LOG.info("Task failed {}", e.toString());
+                  LOG.debug("Task failed", e);
 
-                if (onFailure != null) {
-                  try {
-                    onFailure.run(item, e);
-                  } catch (Exception failException) {
-                    LOG.warn("Failed to clean up on failure", e);
-                    // swallow the exception
+                  if (onFailure != null) {
+                    try {
+                      onFailure.run(item, e);
+                    } catch (Exception failException) {
+                      LOG.warn("Failed to clean up on failure", e);
+                      // swallow the exception
+                    }
+                  }
+                } finally {
+                  if (threw) {
+                    taskFailed.set(true);
                   }
                 }
-              } finally {
-                if (threw) {
-                  taskFailed.set(true);
+
+              } else if (abortTask != null) {
+                // abort the task instead of running it
+                if (stopAbortsOnFailure && abortFailed.get()) {
+                  return;
+                }
+
+                boolean failed = true;
+                try {
+                  LOG.info("Aborting task");
+                  abortTask.run(item);
+                  failed = false;
+                } catch (Exception e) {
+                  LOG.error("Failed to abort task", e);
+                  // swallow the exception
+                } finally {
+                  if (failed) {
+                    abortFailed.set(true);
+                  }
                 }
               }
-
-            } else if (abortTask != null) {
-              // abort the task instead of running it
-              if (stopAbortsOnFailure && abortFailed.get()) {
-                return;
-              }
-
-              boolean failed = true;
-              try {
-                LOG.info("Aborting task");
-                abortTask.run(item);
-                failed = false;
-              } catch (Exception e) {
-                LOG.error("Failed to abort task", e);
-                // swallow the exception
-              } finally {
-                if (failed) {
-                  abortFailed.set(true);
-                }
-              }
+            } finally {
+              resetStatisticsContext();
             }
           }));
         }
@@ -447,7 +465,6 @@
         // mark as a task failure so all submitted tasks will halt/abort
         taskFailed.set(true);
       }
-
       // let the above tasks complete (or abort)
       waitFor(futures, sleepInterval);
       int futureCount = futures.size();
@@ -464,6 +481,7 @@
             }
 
             boolean failed = true;
+            setStatisticsContext();
             try {
               revertTask.run(item);
               failed = false;
@@ -474,6 +492,7 @@
               if (failed) {
                 revertFailed.set(true);
               }
+              resetStatisticsContext();
             }
           }));
         }
@@ -498,6 +517,26 @@
       // return true if all tasks succeeded.
       return !taskFailed.get();
     }
+
+    /**
+     * Set the statistics context for this thread.
+     */
+    private void setStatisticsContext() {
+      if (ioStatisticsContext != null) {
+        IOStatisticsContext.setThreadIOStatisticsContext(ioStatisticsContext);
+      }
+    }
+
+    /**
+     * Reset the statistics context if it was set earlier.
+     * This unbinds the current thread from any statistics
+     * context.
+     */
+    private void resetStatisticsContext() {
+      if (ioStatisticsContext != null) {
+        IOStatisticsContext.setThreadIOStatisticsContext(null);
+      }
+    }
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
index a1dd4d8..6c39cc4 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
@@ -31,7 +31,9 @@
 import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
 import org.apache.hadoop.fs.s3a.impl.StoreContext;
 import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
 import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
 import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
 import org.apache.hadoop.fs.store.audit.AuditSpan;
 import org.apache.hadoop.util.functional.RemoteIterators;
@@ -333,7 +335,7 @@
    * Thread safety: None.
    */
   class FileStatusListingIterator
-      implements RemoteIterator<S3AFileStatus>, IOStatisticsSource {
+      implements RemoteIterator<S3AFileStatus>, IOStatisticsSource, Closeable {
 
     /** Source of objects. */
     private final ObjectListingIterator source;
@@ -404,6 +406,14 @@
     }
 
     /**
+     * Close, if called, will update
+     * the thread statistics context with the value.
+     */
+    @Override
+    public void close() {
+      source.close();
+    }
+    /**
      * Try to retrieve another batch.
      * Note that for the initial batch,
      * {@link ObjectListingIterator} does not generate a request;
@@ -545,6 +555,11 @@
 
     private final AuditSpan span;
 
+    /**
+     * Context statistics aggregator.
+     */
+    private final IOStatisticsAggregator aggregator;
+
     /** The most recent listing results. */
     private S3ListResult objects;
 
@@ -601,6 +616,8 @@
       this.span = span;
       this.s3ListResultFuture = listingOperationCallbacks
           .listObjectsAsync(request, iostats, span);
+      this.aggregator = IOStatisticsContext.getCurrentIOStatisticsContext()
+          .getAggregator();
     }
 
     /**
@@ -693,11 +710,12 @@
     }
 
     /**
-     * Close, if actually called, will close the span
-     * this listing was created with.
+     * Close, if called, will update
+     * the thread statistics context with the value.
      */
     @Override
     public void close() {
+      aggregator.aggregate(getIOStatistics());
     }
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
index 8b1865c..19943ff 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
@@ -41,6 +41,7 @@
 import com.amazonaws.services.s3.model.UploadPartRequest;
 
 import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
 import org.apache.hadoop.util.Preconditions;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
@@ -165,6 +166,9 @@
   /** is client side encryption enabled? */
   private final boolean isCSEEnabled;
 
+  /** Thread level IOStatistics Aggregator. */
+  private final IOStatisticsAggregator threadIOStatisticsAggregator;
+
   /**
    * An S3A output stream which uploads partitions in a separate pool of
    * threads; different {@link S3ADataBlocks.BlockFactory}
@@ -201,6 +205,7 @@
       initMultipartUpload();
     }
     this.isCSEEnabled = builder.isCSEEnabled;
+    this.threadIOStatisticsAggregator = builder.ioStatisticsAggregator;
   }
 
   /**
@@ -454,12 +459,24 @@
    */
   private synchronized void cleanupOnClose() {
     cleanupWithLogger(LOG, getActiveBlock(), blockFactory);
+    mergeThreadIOStatistics(statistics.getIOStatistics());
     LOG.debug("Statistics: {}", statistics);
     cleanupWithLogger(LOG, statistics);
     clearActiveBlock();
   }
 
   /**
+   * Merging the current thread's IOStatistics with the current IOStatistics
+   * context.
+   *
+   * @param streamStatistics Stream statistics to be merged into thread
+   *                         statistics aggregator.
+   */
+  private void mergeThreadIOStatistics(IOStatistics streamStatistics) {
+    getThreadIOStatistics().aggregate(streamStatistics);
+  }
+
+  /**
    * Best effort abort of the multipart upload; sets
    * the field to null afterwards.
    * @return any exception caught during the operation.
@@ -662,6 +679,10 @@
     case StreamCapabilities.ABORTABLE_STREAM:
       return true;
 
+      // IOStatistics context support for thread-level IOStatistics.
+    case StreamCapabilities.IOSTATISTICS_CONTEXT:
+      return true;
+
     default:
       return false;
     }
@@ -702,6 +723,14 @@
   }
 
   /**
+   * Get the IOStatistics aggregator passed in the builder.
+   * @return an aggregator
+   */
+  protected IOStatisticsAggregator getThreadIOStatistics() {
+    return threadIOStatisticsAggregator;
+  }
+
+  /**
    * Multiple partition upload.
    */
   private class MultiPartUpload {
@@ -1092,6 +1121,11 @@
      */
     private PutObjectOptions putOptions;
 
+    /**
+     * thread-level IOStatistics Aggregator.
+     */
+    private IOStatisticsAggregator ioStatisticsAggregator;
+
     private BlockOutputStreamBuilder() {
     }
 
@@ -1108,6 +1142,7 @@
       requireNonNull(putOptions, "null putOptions");
       Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE,
           "Block size is too small: %s", blockSize);
+      requireNonNull(ioStatisticsAggregator, "null ioStatisticsAggregator");
     }
 
     /**
@@ -1229,5 +1264,17 @@
       putOptions = value;
       return this;
     }
+
+    /**
+     * Set builder value.
+     *
+     * @param value new value
+     * @return the builder
+     */
+    public BlockOutputStreamBuilder withIOStatisticsAggregator(
+        final IOStatisticsAggregator value) {
+      ioStatisticsAggregator = value;
+      return this;
+    }
   }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 406f40b..c49c368 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -131,6 +131,7 @@
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
 import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
 import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
 import org.apache.hadoop.fs.store.audit.AuditEntryPoint;
 import org.apache.hadoop.fs.store.audit.ActiveThreadSpanSource;
@@ -1576,7 +1577,8 @@
         statistics,
         statisticsContext,
         fileStatus,
-        vectoredIOContext)
+        vectoredIOContext,
+        IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator())
         .withAuditSpan(auditSpan);
     openFileHelper.applyDefaultOptions(roc);
     return roc.build();
@@ -1743,7 +1745,9 @@
                 DOWNGRADE_SYNCABLE_EXCEPTIONS,
                 DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT))
         .withCSEEnabled(isCSEEnabled)
-        .withPutOptions(putOptions);
+        .withPutOptions(putOptions)
+        .withIOStatisticsAggregator(
+            IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator());
     return new FSDataOutputStream(
         new S3ABlockOutputStream(builder),
         null);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 3069f17..178a807 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -37,6 +37,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.util.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.VisibleForTesting;
@@ -53,9 +54,9 @@
 import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 import org.apache.hadoop.fs.statistics.DurationTracker;
 import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
 import org.apache.hadoop.fs.statistics.IOStatisticsSource;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.util.Preconditions;
 import org.apache.hadoop.util.functional.CallableRaisingIOE;
 
 import static java.util.Objects.requireNonNull;
@@ -187,6 +188,9 @@
    */
   private long asyncDrainThreshold;
 
+  /** Aggregator used to aggregate per thread IOStatistics. */
+  private final IOStatisticsAggregator threadIOStatistics;
+
   /**
    * Create the stream.
    * This does not attempt to open it; that is only done on the first
@@ -225,6 +229,7 @@
     this.asyncDrainThreshold = ctx.getAsyncDrainThreshold();
     this.unboundedThreadPool = unboundedThreadPool;
     this.vectoredIOContext = context.getVectoredIOContext();
+    this.threadIOStatistics = requireNonNull(ctx.getIOStatisticsAggregator());
   }
 
   /**
@@ -600,7 +605,6 @@
         stopVectoredIOOperations.set(true);
         // close or abort the stream; blocking
         awaitFuture(closeStream("close() operation", false, true));
-        LOG.debug("Statistics of stream {}\n{}", key, streamStatistics);
         // end the client+audit span.
         client.close();
         // this is actually a no-op
@@ -608,11 +612,24 @@
       } finally {
         // merge the statistics back into the FS statistics.
         streamStatistics.close();
+        // Collect ThreadLevel IOStats
+        mergeThreadIOStatistics(streamStatistics.getIOStatistics());
       }
     }
   }
 
   /**
+   * Merging the current thread's IOStatistics with the current IOStatistics
+   * context.
+   *
+   * @param streamIOStats Stream statistics to be merged into thread
+   *                      statistics aggregator.
+   */
+  private void mergeThreadIOStatistics(IOStatistics streamIOStats) {
+    threadIOStatistics.aggregate(streamIOStats);
+  }
+
+  /**
    * Close a stream: decide whether to abort or close, based on
    * the length of the stream and the current position.
    * If a close() is attempted and fails, the operation escalates to
@@ -1331,6 +1348,7 @@
   public boolean hasCapability(String capability) {
     switch (toLowerCase(capability)) {
     case StreamCapabilities.IOSTATISTICS:
+    case StreamCapabilities.IOSTATISTICS_CONTEXT:
     case StreamCapabilities.READAHEAD:
     case StreamCapabilities.UNBUFFER:
     case StreamCapabilities.VECTOREDIO:
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
index 803b775..bbd86ef 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
@@ -23,6 +23,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
 import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
+import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
 import org.apache.hadoop.fs.store.audit.AuditSpan;
 
 import javax.annotation.Nullable;
@@ -70,6 +71,9 @@
    */
   private final VectoredIOContext vectoredIOContext;
 
+  /** Thread-level IOStatistics aggregator. **/
+  private final IOStatisticsAggregator ioStatisticsAggregator;
+
   /**
    * Instantiate.
    * @param path path of read
@@ -78,6 +82,7 @@
    * @param instrumentation statistics context
    * @param dstFileStatus target file status
    * @param vectoredIOContext context for vectored read operation.
+   * @param ioStatisticsAggregator IOStatistics aggregator for each thread.
    */
   public S3AReadOpContext(
       final Path path,
@@ -85,11 +90,13 @@
       @Nullable FileSystem.Statistics stats,
       S3AStatisticsContext instrumentation,
       FileStatus dstFileStatus,
-      VectoredIOContext vectoredIOContext) {
+      VectoredIOContext vectoredIOContext,
+      IOStatisticsAggregator ioStatisticsAggregator) {
     super(invoker, stats, instrumentation,
         dstFileStatus);
     this.path = requireNonNull(path);
     this.vectoredIOContext = requireNonNull(vectoredIOContext, "vectoredIOContext");
+    this.ioStatisticsAggregator = ioStatisticsAggregator;
   }
 
   /**
@@ -105,6 +112,7 @@
         "invalid readahead %d", readahead);
     Preconditions.checkArgument(asyncDrainThreshold >= 0,
         "invalid drainThreshold %d", asyncDrainThreshold);
+    requireNonNull(ioStatisticsAggregator, "ioStatisticsAggregator");
     return this;
   }
 
@@ -215,6 +223,15 @@
     return vectoredIOContext;
   }
 
+  /**
+   * Return the IOStatistics aggregator.
+   *
+   * @return instance of IOStatisticsAggregator.
+   */
+  public IOStatisticsAggregator getIOStatisticsAggregator() {
+    return ioStatisticsAggregator;
+  }
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder(
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
index 78b687c..d6044ed 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java
@@ -40,6 +40,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.audit.AuditConstants;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
 import org.apache.hadoop.fs.store.audit.AuditSpan;
 import org.apache.hadoop.fs.store.audit.AuditSpanSource;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
@@ -467,8 +468,7 @@
    *
    * While the classic committers create a 0-byte file, the S3A committers
    * PUT up a the contents of a {@link SuccessData} file.
-   *
-   * @param context job context
+   * @param commitContext commit context
    * @param pending the pending commits
    *
    * @return the success data, even if the marker wasn't created
@@ -476,7 +476,7 @@
    * @throws IOException IO failure
    */
   protected SuccessData maybeCreateSuccessMarkerFromCommits(
-      JobContext context,
+      final CommitContext commitContext,
       ActiveCommit pending) throws IOException {
     List<String> filenames = new ArrayList<>(pending.size());
     // The list of committed objects in pending is size limited in
@@ -488,7 +488,13 @@
     // and the current statistics
     snapshot.aggregate(getIOStatistics());
 
-    return maybeCreateSuccessMarker(context, filenames, snapshot);
+    // and include the context statistics if enabled
+    if (commitContext.isCollectIOStatistics()) {
+      snapshot.aggregate(commitContext.getIOStatisticsContext()
+              .getIOStatistics());
+    }
+
+    return maybeCreateSuccessMarker(commitContext.getJobContext(), filenames, snapshot);
   }
 
   /**
@@ -729,6 +735,7 @@
       final FileStatus status) throws IOException {
 
     final Path path = status.getPath();
+    commitContext.switchToIOStatisticsContext();
     try (DurationInfo ignored =
              new DurationInfo(LOG,
                  "Loading and committing files in pendingset %s", path)) {
@@ -775,6 +782,7 @@
       final FileStatus status) throws IOException {
 
     final Path path = status.getPath();
+    commitContext.switchToIOStatisticsContext();
     try (DurationInfo ignored =
              new DurationInfo(LOG, false, "Committing %s", path)) {
       PendingSet pendingSet = PersistentCommitData.load(
@@ -806,6 +814,7 @@
       final boolean deleteRemoteFiles) throws IOException {
 
     final Path path = status.getPath();
+    commitContext.switchToIOStatisticsContext();
     try (DurationInfo ignored =
              new DurationInfo(LOG, false, "Aborting %s", path)) {
       PendingSet pendingSet = PersistentCommitData.load(
@@ -832,6 +841,8 @@
 
   /**
    * Start the final job commit/abort commit operations.
+   * If configured to collect statistics,
+   * The IO StatisticsContext is reset.
    * @param context job context
    * @return a commit context through which the operations can be invoked.
    * @throws IOException failure.
@@ -840,14 +851,22 @@
       final JobContext context)
       throws IOException {
 
-    return getCommitOperations().createCommitContext(
+    IOStatisticsContext ioStatisticsContext =
+        IOStatisticsContext.getCurrentIOStatisticsContext();
+    CommitContext commitContext = getCommitOperations().createCommitContext(
         context,
         getOutputPath(),
-        getJobCommitThreadCount(context));
+        getJobCommitThreadCount(context),
+        ioStatisticsContext);
+    commitContext.maybeResetIOStatisticsContext();
+    return commitContext;
   }
+
   /**
    * Start a ask commit/abort commit operations.
    * This may have a different thread count.
+   * If configured to collect statistics,
+   * The IO StatisticsContext is reset.
    * @param context job or task context
    * @return a commit context through which the operations can be invoked.
    * @throws IOException failure.
@@ -856,10 +875,13 @@
       final JobContext context)
       throws IOException {
 
-    return getCommitOperations().createCommitContext(
+    CommitContext commitContext = getCommitOperations().createCommitContext(
         context,
         getOutputPath(),
-        getTaskCommitThreadCount(context));
+        getTaskCommitThreadCount(context),
+        IOStatisticsContext.getCurrentIOStatisticsContext());
+    commitContext.maybeResetIOStatisticsContext();
+    return commitContext;
   }
 
   /**
@@ -1014,7 +1036,7 @@
       stage = "completed";
       jobCompleted(true);
       stage = "marker";
-      successData = maybeCreateSuccessMarkerFromCommits(context, pending);
+      successData = maybeCreateSuccessMarkerFromCommits(commitContext, pending);
       stage = "cleanup";
       cleanup(commitContext, false);
     } catch (IOException e) {
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
index b85ce27..6e2a5d8 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
@@ -354,4 +354,24 @@
   public static final String OPT_SUMMARY_REPORT_DIR =
       OPT_PREFIX + "summary.report.directory";
 
+  /**
+   * Experimental feature to collect thread level IO statistics.
+   * When set the committers will reset the statistics in
+   * task setup and propagate to the job committer.
+   * The job comitter will include those and its own statistics.
+   * Do not use if the execution engine is collecting statistics,
+   * as the multiple reset() operations will result in incomplete
+   * statistics.
+   * Value: {@value}.
+   */
+  public static final String S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS =
+      OPT_PREFIX + "experimental.collect.iostatistics";
+
+  /**
+   * Default value for {@link #S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS}.
+   * Value: {@value}.
+   */
+  public static final boolean S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS_DEFAULT =
+      false;
+
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java
index 8bff165..8ac3dcb 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitContext.java
@@ -21,6 +21,7 @@
 import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -36,6 +37,7 @@
 import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
 import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
 import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -47,6 +49,8 @@
 
 import static org.apache.hadoop.fs.s3a.Constants.THREAD_POOL_SHUTDOWN_DELAY_SECONDS;
 import static org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.THREAD_PREFIX;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS_DEFAULT;
 import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.THREAD_KEEP_ALIVE_TIME;
 
 /**
@@ -124,23 +128,42 @@
   private final int committerThreads;
 
   /**
+   * Should IOStatistics be collected by the committer?
+   */
+  private final boolean collectIOStatistics;
+
+  /**
+   * IOStatisticsContext to switch to in all threads
+   * taking part in the commit operation.
+   * This ensures that the IOStatistics collected in the
+   * worker threads will be aggregated into the total statistics
+   * of the thread calling the committer commit/abort methods.
+   */
+  private final IOStatisticsContext ioStatisticsContext;
+
+  /**
    * Create.
    * @param commitOperations commit callbacks
    * @param jobContext job context
    * @param committerThreads number of commit threads
+   * @param ioStatisticsContext IOStatistics context of current thread
    */
   public CommitContext(
       final CommitOperations commitOperations,
       final JobContext jobContext,
-      final int committerThreads) {
+      final int committerThreads,
+      final IOStatisticsContext ioStatisticsContext) {
     this.commitOperations = commitOperations;
     this.jobContext = jobContext;
     this.conf = jobContext.getConfiguration();
     this.jobId = jobContext.getJobID().toString();
+    this.collectIOStatistics = conf.getBoolean(
+        S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS,
+        S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS_DEFAULT);
+    this.ioStatisticsContext = Objects.requireNonNull(ioStatisticsContext);
     this.auditContextUpdater = new AuditContextUpdater(jobContext);
     this.auditContextUpdater.updateCurrentAuditContext();
     this.committerThreads = committerThreads;
-
     buildSubmitters();
   }
 
@@ -152,15 +175,19 @@
    * @param conf job conf
    * @param jobId ID
    * @param committerThreads number of commit threads
+   * @param ioStatisticsContext IOStatistics context of current thread
    */
   public CommitContext(final CommitOperations commitOperations,
       final Configuration conf,
       final String jobId,
-      final int committerThreads) {
+      final int committerThreads,
+      final IOStatisticsContext ioStatisticsContext) {
     this.commitOperations = commitOperations;
     this.jobContext = null;
     this.conf = conf;
     this.jobId = jobId;
+    this.collectIOStatistics = false;
+    this.ioStatisticsContext = Objects.requireNonNull(ioStatisticsContext);
     this.auditContextUpdater = new AuditContextUpdater(jobId);
     this.auditContextUpdater.updateCurrentAuditContext();
     this.committerThreads = committerThreads;
@@ -359,6 +386,44 @@
   }
 
   /**
+   * Collecting thread level IO statistics?
+   * @return true if thread level IO stats should be collected.
+   */
+  public boolean isCollectIOStatistics() {
+    return collectIOStatistics;
+  }
+
+  /**
+   * IOStatistics context of the created thread.
+   * @return the IOStatistics.
+   */
+  public IOStatisticsContext getIOStatisticsContext() {
+    return ioStatisticsContext;
+  }
+
+  /**
+   * Switch to the context IOStatistics context,
+   * if needed.
+   */
+  public void switchToIOStatisticsContext() {
+    IOStatisticsContext.setThreadIOStatisticsContext(ioStatisticsContext);
+  }
+
+  /**
+   * Reset the IOStatistics context if statistics are being
+   * collected.
+   * Logs at info.
+   */
+  public void maybeResetIOStatisticsContext() {
+    if (collectIOStatistics) {
+
+      LOG.info("Resetting IO statistics context {}",
+          ioStatisticsContext.getID());
+      ioStatisticsContext.reset();
+    }
+  }
+
+  /**
    * Submitter for a given thread pool.
    */
   private final class PoolSubmitter implements TaskPool.Submitter, Closeable {
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java
index 0772e14..eb23f29 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java
@@ -62,6 +62,7 @@
 import org.apache.hadoop.fs.statistics.DurationTracker;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.util.DurationInfo;
 import org.apache.hadoop.util.Preconditions;
@@ -639,15 +640,18 @@
    * @param context job context
    * @param path path for all work.
    * @param committerThreads thread pool size
+   * @param ioStatisticsContext IOStatistics context of current thread
    * @return the commit context to pass in.
    * @throws IOException failure.
    */
   public CommitContext createCommitContext(
       JobContext context,
       Path path,
-      int committerThreads) throws IOException {
+      int committerThreads,
+      IOStatisticsContext ioStatisticsContext) throws IOException {
     return new CommitContext(this, context,
-        committerThreads);
+        committerThreads,
+        ioStatisticsContext);
   }
 
   /**
@@ -668,7 +672,8 @@
     return new CommitContext(this,
         getStoreContext().getConfiguration(),
         id,
-        committerThreads);
+        committerThreads,
+        IOStatisticsContext.getCurrentIOStatisticsContext());
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
index 007e9b3..9ded64e 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicS3GuardCommitter.java
@@ -219,6 +219,13 @@
       }
       pendingSet.putExtraData(TASK_ATTEMPT_ID, taskId);
       pendingSet.setJobId(jobId);
+      // add in the IOStatistics of all the file loading
+      if (commitContext.isCollectIOStatistics()) {
+        pendingSet.getIOStatistics()
+            .aggregate(
+                commitContext.getIOStatisticsContext().getIOStatistics());
+      }
+
       Path jobAttemptPath = getJobAttemptPath(context);
       TaskAttemptID taskAttemptID = context.getTaskAttemptID();
       Path taskOutcomePath = new Path(jobAttemptPath,
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
index 36eae01..d764055 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
@@ -696,6 +696,13 @@
           pendingCommits.add(commit);
         }
 
+        // maybe add in the IOStatistics the thread
+        if (commitContext.isCollectIOStatistics()) {
+          pendingCommits.getIOStatistics().aggregate(
+              commitContext.getIOStatisticsContext()
+              .getIOStatistics());
+        }
+
         // save the data
         // overwrite any existing file, so whichever task attempt
         // committed last wins.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java
index 213f944..e90ad8b 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java
@@ -27,6 +27,7 @@
 import org.apache.hadoop.fs.contract.s3a.S3AContract;
 import org.apache.hadoop.fs.s3a.tools.MarkerTool;
 import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
 import org.apache.hadoop.fs.store.audit.AuditSpan;
 import org.apache.hadoop.fs.store.audit.AuditSpanSource;
 import org.apache.hadoop.io.IOUtils;
@@ -38,6 +39,7 @@
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
@@ -67,6 +69,15 @@
   private AuditSpanSource spanSource;
 
   /**
+   * Atomic references to be used to re-throw an Exception or an ASE
+   * caught inside a lambda function.
+   */
+  private static final AtomicReference<Exception> FUTURE_EXCEPTION =
+      new AtomicReference<>();
+  private static final AtomicReference<AssertionError> FUTURE_ASE =
+      new AtomicReference<>();
+
+  /**
    * Get the source.
    * @return span source
    */
@@ -99,6 +110,9 @@
     S3AFileSystem.initializeClass();
     super.setup();
     setSpanSource(getFileSystem());
+    // Reset the current context's thread IOStatistics.`
+    // this ensures that the context stats will always be from the test case
+    IOStatisticsContext.getCurrentIOStatisticsContext().reset();
   }
 
   @Override
@@ -263,4 +277,53 @@
     Assume.assumeTrue("Skipping test if CSE is enabled",
         !getFileSystem().isCSEEnabled());
   }
+
+  /**
+   * If an exception is caught while doing some work in a Lambda function,
+   * store it in an atomic reference to be thrown later on.
+   * @param exception Exception caught.
+   */
+  public static void setFutureException(Exception exception) {
+    FUTURE_EXCEPTION.set(exception);
+  }
+
+  /**
+   * If an Assertion is caught while doing some work in a Lambda function,
+   * store it in an atomic reference to be thrown later on.
+   *
+   * @param ase Assertion Error caught.
+   */
+  public static void setFutureAse(AssertionError ase) {
+    FUTURE_ASE.set(ase);
+  }
+
+  /**
+   * throw the caught exception from the atomic reference and also clear the
+   * atomic reference so that we don't rethrow in another test.
+   *
+   * @throws Exception the exception caught.
+   */
+  public static void maybeReThrowFutureException() throws Exception {
+    if (FUTURE_EXCEPTION.get() != null) {
+      Exception exceptionToThrow = FUTURE_EXCEPTION.get();
+      // reset the atomic ref before throwing.
+      setFutureAse(null);
+      throw exceptionToThrow;
+    }
+  }
+
+  /**
+   * throw the Assertion error from the atomic reference and also clear the
+   * atomic reference so that we don't rethrow in another test.
+   *
+   * @throws Exception Assertion error caught.
+   */
+  public static void maybeReThrowFutureASE() throws Exception {
+    if (FUTURE_ASE.get() != null) {
+      AssertionError aseToThrow = FUTURE_ASE.get();
+      // reset the atomic ref before throwing.
+      setFutureAse(null);
+      throw aseToThrow;
+    }
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
new file mode 100644
index 0000000..19c40c6
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
@@ -0,0 +1,487 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.StoreStatisticNames;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsContextImpl;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.functional.CloseableTaskPoolSubmitter;
+import org.apache.hadoop.util.functional.TaskPool;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_BYTES;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration.enableIOStatisticsContext;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration.getCurrentIOStatisticsContext;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration.getThreadSpecificIOStatisticsContext;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsContextIntegration.setThreadIOStatisticsContext;
+import static org.apache.hadoop.util.functional.RemoteIterators.foreach;
+
+/**
+ * Tests to verify the Thread-level IOStatistics.
+ */
+public class ITestS3AIOStatisticsContext extends AbstractS3ATestBase {
+
+  private static final int SMALL_THREADS = 2;
+  private static final int BYTES_BIG = 100;
+  private static final int BYTES_SMALL = 50;
+  private static final String[] IOSTATISTICS_CONTEXT_CAPABILITY =
+      new String[] {StreamCapabilities.IOSTATISTICS_CONTEXT};
+  private ExecutorService executor;
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration configuration = super.createConfiguration();
+    enableIOStatisticsContext();
+    return configuration;
+  }
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    executor = HadoopExecutors.newFixedThreadPool(SMALL_THREADS);
+  }
+
+  @Override
+  public void teardown() throws Exception {
+    if (executor != null) {
+      executor.shutdown();
+    }
+    super.teardown();
+  }
+
+  /**
+   * Verify that S3AInputStream aggregates per thread IOStats collection
+   * correctly.
+   */
+  @Test
+  public void testS3AInputStreamIOStatisticsContext()
+      throws Exception {
+    S3AFileSystem fs = getFileSystem();
+    Path path = path(getMethodName());
+    byte[] data = dataset(256, 'a', 'z');
+    byte[] readDataFirst = new byte[BYTES_BIG];
+    byte[] readDataSecond = new byte[BYTES_SMALL];
+    writeDataset(fs, path, data, data.length, 1024, true);
+
+    CountDownLatch latch = new CountDownLatch(SMALL_THREADS);
+
+    try {
+
+      for (int i = 0; i < SMALL_THREADS; i++) {
+        executor.submit(() -> {
+          try {
+            // get the thread context and reset
+            IOStatisticsContext context =
+                getAndResetThreadStatisticsContext();
+            try (FSDataInputStream in = fs.open(path)) {
+              // Assert the InputStream's stream capability to support
+              // IOStatisticsContext.
+              assertCapabilities(in, IOSTATISTICS_CONTEXT_CAPABILITY, null);
+              in.seek(50);
+              in.read(readDataFirst);
+            }
+            assertContextBytesRead(context, BYTES_BIG);
+            // Stream is closed for a thread. Re-open and do more operations.
+            try (FSDataInputStream in = fs.open(path)) {
+              in.seek(100);
+              in.read(readDataSecond);
+            }
+            assertContextBytesRead(context, BYTES_BIG + BYTES_SMALL);
+
+            latch.countDown();
+          } catch (Exception e) {
+            latch.countDown();
+            setFutureException(e);
+            LOG.error("An error occurred while doing a task in the thread", e);
+          } catch (AssertionError ase) {
+            latch.countDown();
+            setFutureAse(ase);
+            throw ase;
+          }
+        });
+      }
+      // wait for tasks to finish.
+      latch.await();
+    } finally {
+      executor.shutdown();
+    }
+
+    // Check if an Exception or ASE was caught while the test threads were running.
+    maybeReThrowFutureException();
+    maybeReThrowFutureASE();
+
+  }
+
+  /**
+   * get the thread context and reset.
+   * @return thread context
+   */
+  private static IOStatisticsContext getAndResetThreadStatisticsContext() {
+    IOStatisticsContext context =
+        IOStatisticsContext.getCurrentIOStatisticsContext();
+    context.reset();
+    return context;
+  }
+
+  /**
+   * Verify that S3ABlockOutputStream aggregates per thread IOStats collection
+   * correctly.
+   */
+  @Test
+  public void testS3ABlockOutputStreamIOStatisticsContext()
+      throws Exception {
+    S3AFileSystem fs = getFileSystem();
+    Path path = path(getMethodName());
+    byte[] writeDataFirst = new byte[BYTES_BIG];
+    byte[] writeDataSecond = new byte[BYTES_SMALL];
+
+    final ExecutorService executorService =
+        HadoopExecutors.newFixedThreadPool(SMALL_THREADS);
+    CountDownLatch latch = new CountDownLatch(SMALL_THREADS);
+
+    try {
+      for (int i = 0; i < SMALL_THREADS; i++) {
+        executorService.submit(() -> {
+          try {
+            // get the thread context and reset
+            IOStatisticsContext context =
+                getAndResetThreadStatisticsContext();
+            try (FSDataOutputStream out = fs.create(path)) {
+              // Assert the OutputStream's stream capability to support
+              // IOStatisticsContext.
+              assertCapabilities(out, IOSTATISTICS_CONTEXT_CAPABILITY, null);
+              out.write(writeDataFirst);
+            }
+            assertContextBytesWrite(context, BYTES_BIG);
+
+            // Stream is closed for a thread. Re-open and do more operations.
+            try (FSDataOutputStream out = fs.create(path)) {
+              out.write(writeDataSecond);
+            }
+            assertContextBytesWrite(context, BYTES_BIG + BYTES_SMALL);
+            latch.countDown();
+          } catch (Exception e) {
+            latch.countDown();
+            setFutureException(e);
+            LOG.error("An error occurred while doing a task in the thread", e);
+          } catch (AssertionError ase) {
+            latch.countDown();
+            setFutureAse(ase);
+            throw ase;
+          }
+        });
+      }
+      // wait for tasks to finish.
+      latch.await();
+    } finally {
+      executorService.shutdown();
+    }
+
+    // Check if an Excp or ASE was caught while the test threads were running.
+    maybeReThrowFutureException();
+    maybeReThrowFutureASE();
+  }
+
+  /**
+   * Verify stats collection and aggregation for constructor thread, Junit
+   * thread and a worker thread.
+   */
+  @Test
+  public void testThreadIOStatisticsForDifferentThreads()
+      throws IOException, InterruptedException {
+    S3AFileSystem fs = getFileSystem();
+    Path path = path(getMethodName());
+    byte[] data = new byte[BYTES_BIG];
+    long threadIdForTest = Thread.currentThread().getId();
+    IOStatisticsContext context =
+        getAndResetThreadStatisticsContext();
+    Assertions.assertThat(((IOStatisticsContextImpl)context).getThreadID())
+        .describedAs("Thread ID of %s", context)
+        .isEqualTo(threadIdForTest);
+    Assertions.assertThat(((IOStatisticsContextImpl)context).getID())
+        .describedAs("ID of %s", context)
+        .isGreaterThan(0);
+
+    // Write in the Junit thread.
+    try (FSDataOutputStream out = fs.create(path)) {
+      out.write(data);
+    }
+
+    // Read in the Junit thread.
+    try (FSDataInputStream in = fs.open(path)) {
+      in.read(data);
+    }
+
+    // Worker thread work and wait for it to finish.
+    TestWorkerThread workerThread = new TestWorkerThread(path, null);
+    long workerThreadID = workerThread.getId();
+    workerThread.start();
+    workerThread.join();
+
+    assertThreadStatisticsForThread(threadIdForTest, BYTES_BIG);
+    assertThreadStatisticsForThread(workerThreadID, BYTES_SMALL);
+  }
+
+  /**
+   * Verify stats collection and aggregation for constructor thread, Junit
+   * thread and a worker thread.
+   */
+  @Test
+  public void testThreadSharingIOStatistics()
+      throws IOException, InterruptedException {
+    S3AFileSystem fs = getFileSystem();
+    Path path = path(getMethodName());
+    byte[] data = new byte[BYTES_BIG];
+    long threadIdForTest = Thread.currentThread().getId();
+    IOStatisticsContext context =
+        getAndResetThreadStatisticsContext();
+
+
+    // Write in the Junit thread.
+    try (FSDataOutputStream out = fs.create(path)) {
+      out.write(data);
+    }
+
+    // Read in the Junit thread.
+    try (FSDataInputStream in = fs.open(path)) {
+      in.read(data);
+    }
+
+    // Worker thread will share the same context.
+    TestWorkerThread workerThread = new TestWorkerThread(path, context);
+    long workerThreadID = workerThread.getId();
+    workerThread.start();
+    workerThread.join();
+
+    assertThreadStatisticsForThread(threadIdForTest, BYTES_BIG + BYTES_SMALL);
+
+  }
+
+  /**
+   * Test to verify if setting the current IOStatisticsContext removes the
+   * current context and creates a new instance of it.
+   */
+  @Test
+  public void testSettingNullIOStatisticsContext() {
+    IOStatisticsContext ioStatisticsContextBefore =
+        getCurrentIOStatisticsContext();
+    // Set the current IOStatisticsContext to null, which should remove the
+    // context and set a new one.
+    setThreadIOStatisticsContext(null);
+    // Get the context again after setting.
+    IOStatisticsContext ioStatisticsContextAfter =
+        getCurrentIOStatisticsContext();
+    //Verify the context ID after setting to null is different than the previous
+    // one.
+    Assertions.assertThat(ioStatisticsContextBefore.getID())
+        .describedAs("A new IOStaticsContext should be set after setting the "
+            + "current to null")
+        .isNotEqualTo(ioStatisticsContextAfter.getID());
+  }
+
+  /**
+   * Assert bytes written by the statistics context.
+   *
+   * @param context statistics context.
+   * @param bytes expected bytes.
+   */
+  private void assertContextBytesWrite(IOStatisticsContext context,
+      int bytes) {
+    verifyStatisticCounterValue(
+        context.getIOStatistics(),
+        STREAM_WRITE_BYTES,
+        bytes);
+  }
+
+  /**
+   * Assert bytes read by the statistics context.
+   *
+   * @param context statistics context.
+   * @param readBytes expected bytes.
+   */
+  private void assertContextBytesRead(IOStatisticsContext context,
+      int readBytes) {
+    verifyStatisticCounterValue(
+        context.getIOStatistics(),
+        STREAM_READ_BYTES,
+        readBytes);
+  }
+
+  /**
+   * Assert fixed bytes wrote and read for a particular thread ID.
+   *
+   * @param testThreadId                thread ID.
+   * @param expectedBytesWrittenAndRead expected bytes.
+   */
+  private void assertThreadStatisticsForThread(long testThreadId,
+      int expectedBytesWrittenAndRead) {
+    LOG.info("Thread ID to be asserted: {}", testThreadId);
+    IOStatisticsContext ioStatisticsContext =
+        getThreadSpecificIOStatisticsContext(testThreadId);
+    Assertions.assertThat(ioStatisticsContext)
+        .describedAs("IOStatisticsContext for %d", testThreadId)
+        .isNotNull();
+
+
+    IOStatistics ioStatistics = ioStatisticsContext.snapshot();
+
+
+    assertThatStatisticCounter(ioStatistics,
+            STREAM_WRITE_BYTES)
+        .describedAs("Bytes written are not as expected for thread : %s",
+                testThreadId)
+        .isEqualTo(expectedBytesWrittenAndRead);
+
+    assertThatStatisticCounter(ioStatistics,
+        STREAM_READ_BYTES)
+        .describedAs("Bytes read are not as expected for thread : %s",
+                testThreadId)
+        .isEqualTo(expectedBytesWrittenAndRead);
+  }
+
+  @Test
+  public void testListingStatisticsContext() throws Throwable {
+    describe("verify the list operations update on close()");
+
+    S3AFileSystem fs = getFileSystem();
+    Path path = methodPath();
+    fs.mkdirs(methodPath());
+
+    // after all setup, get the reset context
+    IOStatisticsContext context =
+        getAndResetThreadStatisticsContext();
+    IOStatistics ioStatistics = context.getIOStatistics();
+
+    fs.listStatus(path);
+    verifyStatisticCounterValue(ioStatistics,
+        StoreStatisticNames.OBJECT_LIST_REQUEST,
+        1);
+
+    context.reset();
+    foreach(fs.listStatusIterator(path), i -> {});
+    verifyStatisticCounterValue(ioStatistics,
+        StoreStatisticNames.OBJECT_LIST_REQUEST,
+        1);
+
+    context.reset();
+    foreach(fs.listLocatedStatus(path), i -> {});
+    verifyStatisticCounterValue(ioStatistics,
+        StoreStatisticNames.OBJECT_LIST_REQUEST,
+        1);
+
+    context.reset();
+    foreach(fs.listFiles(path, true), i -> {});
+    verifyStatisticCounterValue(ioStatistics,
+        StoreStatisticNames.OBJECT_LIST_REQUEST,
+        1);
+  }
+
+  @Test
+  public void testListingThroughTaskPool() throws Throwable {
+    describe("verify the list operations are updated through taskpool");
+
+    S3AFileSystem fs = getFileSystem();
+    Path path = methodPath();
+    fs.mkdirs(methodPath());
+
+    // after all setup, get the reset context
+    IOStatisticsContext context =
+        getAndResetThreadStatisticsContext();
+    IOStatistics ioStatistics = context.getIOStatistics();
+
+    CloseableTaskPoolSubmitter submitter =
+        new CloseableTaskPoolSubmitter(executor);
+    TaskPool.foreach(fs.listStatusIterator(path))
+        .executeWith(submitter)
+        .run(i -> {});
+
+    verifyStatisticCounterValue(ioStatistics,
+        StoreStatisticNames.OBJECT_LIST_REQUEST,
+        1);
+
+  }
+
+  /**
+   * Simulating doing some work in a separate thread.
+   * If constructed with an IOStatisticsContext then
+   * that context is switched to before performing the IO.
+   */
+  private class TestWorkerThread extends Thread implements Runnable {
+    private final Path workerThreadPath;
+
+    private final IOStatisticsContext ioStatisticsContext;
+
+    /**
+     * create.
+     * @param workerThreadPath thread path.
+     * @param ioStatisticsContext optional statistics context     *
+     */
+    TestWorkerThread(
+        final Path workerThreadPath,
+        final IOStatisticsContext ioStatisticsContext) {
+      this.workerThreadPath = workerThreadPath;
+      this.ioStatisticsContext = ioStatisticsContext;
+    }
+
+    @Override
+    public void run() {
+      S3AFileSystem fs = getFileSystem();
+      byte[] data = new byte[BYTES_SMALL];
+
+      // maybe switch context
+      if (ioStatisticsContext != null) {
+        IOStatisticsContext.setThreadIOStatisticsContext(ioStatisticsContext);
+      }
+
+      // Write in the worker thread.
+      try (FSDataOutputStream out = fs.create(workerThreadPath)) {
+        out.write(data);
+      } catch (IOException e) {
+        throw new UncheckedIOException("Failure while writing", e);
+      }
+
+      //Read in the worker thread.
+      try (FSDataInputStream in = fs.open(workerThreadPath)) {
+        in.read(data);
+      } catch (IOException e) {
+        throw new UncheckedIOException("Failure while reading", e);
+      }
+    }
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java
index 08f4f8b..d38b5c9 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockOutputStream.java
@@ -24,6 +24,7 @@
 import org.apache.hadoop.fs.s3a.commit.PutTracker;
 import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
 import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
 import org.apache.hadoop.util.Progressable;
 import org.junit.Before;
 import org.junit.Test;
@@ -67,7 +68,11 @@
             .withProgress(progressable)
             .withPutTracker(putTracker)
             .withWriteOperations(oHelper)
-            .withPutOptions(PutObjectOptions.keepingDirs());
+            .withPutOptions(PutObjectOptions.keepingDirs())
+            .withIOStatisticsAggregator(
+                IOStatisticsContext.getCurrentIOStatisticsContext()
+                    .getAggregator());
+
     return builder;
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
index 9d9000c..9987257 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
@@ -26,6 +26,7 @@
 import java.util.List;
 
 import org.assertj.core.api.Assertions;
+import org.junit.AfterClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,6 +39,9 @@
 import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
+import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
+import org.apache.hadoop.fs.statistics.IOStatisticsSnapshot;
+import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -67,6 +71,12 @@
       LoggerFactory.getLogger(AbstractCommitITest.class);
 
   /**
+   * Job statistics accrued across all test cases.
+   */
+  private static final IOStatisticsSnapshot JOB_STATISTICS =
+      IOStatisticsSupport.snapshotIOStatistics();
+
+  /**
    * Helper class for commit operations and assertions.
    */
   private CommitterTestHelper testHelper;
@@ -92,7 +102,8 @@
         FS_S3A_COMMITTER_NAME,
         FS_S3A_COMMITTER_STAGING_CONFLICT_MODE,
         FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
-        FAST_UPLOAD_BUFFER);
+        FAST_UPLOAD_BUFFER,
+        S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS);
 
     conf.setBoolean(MAGIC_COMMITTER_ENABLED, DEFAULT_MAGIC_COMMITTER_ENABLED);
     conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
@@ -100,9 +111,15 @@
     conf.set(FAST_UPLOAD_BUFFER, FAST_UPLOAD_BUFFER_ARRAY);
     // and bind the report dir
     conf.set(OPT_SUMMARY_REPORT_DIR, reportDir.toURI().toString());
+    conf.setBoolean(S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS, true);
     return conf;
   }
 
+  @AfterClass
+  public static void printStatistics() {
+    LOG.info("Aggregate job statistics {}\n",
+        IOStatisticsLogging.ioStatisticsToPrettyString(JOB_STATISTICS));
+  }
   /**
    * Get the log; can be overridden for test case log.
    * @return a log.
@@ -397,6 +414,8 @@
 
   /**
    * Load a success file; fail if the file is empty/nonexistent.
+   * The statistics in {@link #JOB_STATISTICS} are updated with
+   * the statistics from the success file
    * @param fs filesystem
    * @param outputPath directory containing the success file.
    * @param origin origin of the file
@@ -426,6 +445,8 @@
     String body = ContractTestUtils.readUTF8(fs, success, -1);
     LOG.info("Loading committer success file {}. Actual contents=\n{}", success,
         body);
-    return SuccessData.load(fs, success);
+    SuccessData successData = SuccessData.load(fs, success);
+    JOB_STATISTICS.aggregate(successData.getIOStatistics());
+    return successData;
   }
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java
index b92605c..439ef9a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java
@@ -34,6 +34,7 @@
 import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
 import org.apache.hadoop.fs.s3a.commit.impl.CommitContext;
 import org.apache.hadoop.fs.s3a.commit.impl.CommitOperations;
+import org.apache.hadoop.fs.statistics.IOStatisticsContext;
 
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
 import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.*;
@@ -92,7 +93,8 @@
     // this is done by calling the preCommit method directly,
 
     final CommitContext commitContext = new CommitOperations(getWrapperFS()).
-        createCommitContext(getJob(), getOutputPath(), 0);
+        createCommitContext(getJob(), getOutputPath(), 0,
+            IOStatisticsContext.getCurrentIOStatisticsContext());
     committer.preCommitJob(commitContext, AbstractS3ACommitter.ActiveCommit.empty());
 
     reset(mockFS);
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java
index 3685766..b19662c 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java
@@ -37,6 +37,7 @@
 
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.CONFLICT_MODE_APPEND;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS;
 
 /** ITest of the low level protocol methods. */
 public class ITestDirectoryCommitProtocol extends ITestStagingCommitProtocol {
@@ -52,6 +53,14 @@
   }
 
   @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    // turn off stats collection to verify that it works
+    conf.setBoolean(S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS, false);
+    return  conf;
+  }
+
+  @Override
   protected AbstractS3ACommitter createCommitter(
       Path outputPath,
       TaskAttemptContext context)