HADOOP-19737: ABFS: Add metrics to identify improvements with read and write aggressiveness (#8056)

Contributed by Anmol Asrani
diff --git a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
index f18a2a6..33573b8 100644
--- a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
+++ b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
@@ -63,6 +63,12 @@
     <!-- allow tests to use _ for ordering. -->
     <suppress checks="MethodName"
               files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]commit[\\/]ITestAbfsTerasort.java"/>
-     <suppress checks="ParameterNumber"
-       files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]TestAbfsOutputStream.java"/>
+  <suppress checks="ParameterNumber"
+              files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]TestAbfsOutputStream.java"/>
+  <suppress checks="ParameterNumber"
+              files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]ReadBufferManagerV2.java"/>
+  <suppress checks="ParameterNumber"
+              files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]WriteThreadPoolSizeManager.java"/>
+  <suppress checks="ParameterNumber"
+             files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]ResourceUtilizationStats.java"/>
 </suppressions>
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index e4c88b2..e759129 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -545,7 +545,6 @@ public class AbfsConfiguration{
   private int writeMediumCpuThreshold;
 
   @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_LOW_CPU_THRESHOLD_PERCENT,
-      MinValue = MIN_WRITE_LOW_CPU_THRESHOLD_PERCENT,
       MaxValue = MAX_WRITE_LOW_CPU_THRESHOLD_PERCENT,
       DefaultValue = DEFAULT_WRITE_LOW_CPU_THRESHOLD_PERCENT)
   private int writeLowCpuThreshold;
@@ -566,6 +565,16 @@ public class AbfsConfiguration{
   private int highTierMemoryMultiplier;
 
   @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+      FS_AZURE_WRITE_HIGH_MEMORY_USAGE_THRESHOLD_PERCENT,
+      DefaultValue = DEFAULT_WRITE_HIGH_MEMORY_USAGE_THRESHOLD_PERCENT)
+  private int writeHighMemoryUsageThresholdPercent;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+      FS_AZURE_WRITE_LOW_MEMORY_USAGE_THRESHOLD_PERCENT,
+      DefaultValue = DEFAULT_WRITE_LOW_MEMORY_USAGE_THRESHOLD_PERCENT)
+  private int writeLowMemoryUsageThresholdPercent;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
       FS_AZURE_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE, DefaultValue = DEFAULT_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE,
       MinValue = MIN_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE, MaxValue = MAX_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE)
   private int apacheMaxCacheSize;
@@ -1917,6 +1926,14 @@ public int getHighTierMemoryMultiplier() {
     return highTierMemoryMultiplier;
   }
 
+  public int getWriteHighMemoryUsageThresholdPercent() {
+    return writeHighMemoryUsageThresholdPercent;
+  }
+
+  public int getWriteLowMemoryUsageThresholdPercent() {
+    return writeLowMemoryUsageThresholdPercent;
+  }
+
   public int getMaxWriteRequestsToQueue() {
     if (this.maxWriteRequestsToQueue < 1) {
       return 2 * getWriteConcurrentRequestCount();
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
index 7a94117..8bc7d50 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
@@ -27,6 +27,8 @@
 import org.apache.hadoop.fs.azurebfs.services.AbfsBackoffMetrics;
 import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
 import org.apache.hadoop.fs.azurebfs.services.AbfsReadFooterMetrics;
+import org.apache.hadoop.fs.azurebfs.services.AbfsReadResourceUtilizationMetrics;
+import org.apache.hadoop.fs.azurebfs.services.AbfsWriteResourceUtilizationMetrics;
 import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
 import org.apache.hadoop.fs.statistics.DurationTracker;
 import org.apache.hadoop.fs.statistics.IOStatistics;
@@ -106,6 +108,10 @@ public class AbfsCountersImpl implements AbfsCounters {
 
   private AbfsReadFooterMetrics abfsReadFooterMetrics = null;
 
+  private AbfsWriteResourceUtilizationMetrics abfsWriteResourceUtilizationMetrics = null;
+
+  private AbfsReadResourceUtilizationMetrics abfsReadResourceUtilizationMetrics = null;
+
   private AtomicLong lastExecutionTime = null;
 
   private static final AbfsStatistic[] STATISTIC_LIST = {
@@ -169,6 +175,31 @@ public AbfsCountersImpl(URI uri) {
     lastExecutionTime = new AtomicLong(now());
   }
 
+  /**
+   * Initializes the metrics collector for the read thread pool.
+   * <p>
+   * This method creates a new instance of {@link AbfsReadResourceUtilizationMetrics}
+   * to track performance statistics and operational metrics related to
+   * read operations executed by the thread pool.
+   * </p>
+   */
+  public void initializeReadResourceUtilizationMetrics() {
+    abfsReadResourceUtilizationMetrics = new AbfsReadResourceUtilizationMetrics();
+  }
+
+  /**
+   * Initializes the metrics collector for the write thread pool.
+   * <p>
+   * This method creates a new instance of {@link AbfsWriteResourceUtilizationMetrics}
+   * to track performance statistics and operational metrics related to
+   * write operations executed by the thread pool.
+   * </p>
+   */
+  public void initializeWriteResourceUtilizationMetrics() {
+    abfsWriteResourceUtilizationMetrics = new AbfsWriteResourceUtilizationMetrics();
+  }
+
+
   @Override
   public void initializeMetrics(MetricFormat metricFormat) {
     switch (metricFormat) {
@@ -268,6 +299,22 @@ public AbfsReadFooterMetrics getAbfsReadFooterMetrics() {
   }
 
   /**
+   * Returns the write thread pool metrics instance, or {@code null} if uninitialized.
+   */
+  @Override
+  public AbfsWriteResourceUtilizationMetrics getAbfsWriteResourceUtilizationMetrics() {
+    return abfsWriteResourceUtilizationMetrics != null ? abfsWriteResourceUtilizationMetrics : null;
+  }
+
+  /**
+   * Returns the read thread pool metrics instance, or {@code null} if uninitialized.
+   */
+  @Override
+  public AbfsReadResourceUtilizationMetrics getAbfsReadResourceUtilizationMetrics() {
+    return abfsReadResourceUtilizationMetrics != null ? abfsReadResourceUtilizationMetrics : null;
+  }
+
+  /**
    * {@inheritDoc}
    *
    * Method to aggregate all the counters in the MetricRegistry and form a
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index d3f1ecd..5d7d089 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -205,7 +205,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
   private int blockOutputActiveBlocks;
   /** Bounded ThreadPool for this instance. */
   private ExecutorService boundedThreadPool;
-  private WriteThreadPoolSizeManager poolSizeManager;
+  private WriteThreadPoolSizeManager writeThreadPoolSizeManager;
 
   /** ABFS instance reference to be held by the store to avoid GC close. */
   private BackReference fsBackRef;
@@ -281,11 +281,10 @@ public AzureBlobFileSystemStore(
     this.blockFactory = abfsStoreBuilder.blockFactory;
     this.blockOutputActiveBlocks = abfsStoreBuilder.blockOutputActiveBlocks;
     if (abfsConfiguration.isDynamicWriteThreadPoolEnablement()) {
-      this.poolSizeManager = WriteThreadPoolSizeManager.getInstance(
+      this.writeThreadPoolSizeManager = WriteThreadPoolSizeManager.getInstance(
           getClient().getFileSystem() + "-" + UUID.randomUUID(),
-          abfsConfiguration);
-      poolSizeManager.startCPUMonitoring();
-      this.boundedThreadPool = poolSizeManager.getExecutorService();
+          abfsConfiguration, getClient().getAbfsCounters());
+      this.boundedThreadPool = writeThreadPoolSizeManager.getExecutorService();
     } else {
       this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
           abfsConfiguration.getWriteConcurrentRequestCount(),
@@ -343,7 +342,7 @@ public void close() throws IOException {
     } catch (ExecutionException e) {
       LOG.error("Error freeing leases", e);
     } finally {
-      IOUtils.cleanupWithLogger(LOG, poolSizeManager, getClientHandler());
+      IOUtils.cleanupWithLogger(LOG, writeThreadPoolSizeManager, getClientHandler());
     }
   }
 
@@ -822,6 +821,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
             .withPath(path)
             .withExecutorService(new SemaphoredDelegatingExecutor(boundedThreadPool,
                 blockOutputActiveBlocks, true))
+            .withWriteThreadPoolManager(writeThreadPoolSizeManager)
             .withTracingContext(tracingContext)
             .withAbfsBackRef(fsBackRef)
             .withIngressServiceType(abfsConfiguration.getIngressServiceType())
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java
index d7887b6..24aecb2 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java
@@ -18,17 +18,8 @@
 
 package org.apache.hadoop.fs.azurebfs;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.Closeable;
 import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.lang.management.MemoryUsage;
-
-import com.sun.management.OperatingSystemMXBean;
-
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -39,19 +30,32 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
+import org.apache.hadoop.fs.azurebfs.services.AbfsWriteResourceUtilizationMetrics;
+import org.apache.hadoop.fs.azurebfs.services.ResourceUtilizationStats;
+import org.apache.hadoop.fs.azurebfs.utils.ResourceUtilizationUtils;
+
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.LOW_HEAP_SPACE_FACTOR;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.MEDIUM_HEAP_SPACE_FACTOR;
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BYTES_PER_GIGABYTE;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_REDUCTION_FACTOR;
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_MEDIUM_HEAP_FACTOR;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HEAP_FACTOR;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_POOL_SIZE_INCREASE_FACTOR;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_REDUCTION_FACTOR;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_DOWN;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_DOWN_AT_MIN;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_UP_AT_MAX;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_UP;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.THIRTY_SECONDS;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO_D;
 
 /**
  * Manages a thread pool for writing operations, adjusting the pool size based on CPU utilization.
@@ -78,10 +82,20 @@ public final class WriteThreadPoolSizeManager implements Closeable {
   private final String filesystemName;
   /* Initial size for the thread pool when created. */
   private final int initialPoolSize;
-  /* Initially available heap memory. */
-  private final long initialAvailableHeapMemory;
   /* The configuration instance. */
   private final AbfsConfiguration abfsConfiguration;
+  /* Metrics collector for monitoring the performance of the ABFS write thread pool.  */
+  private final AbfsWriteResourceUtilizationMetrics writeThreadPoolMetrics;
+  /* Flag indicating if CPU monitoring has started. */
+  private volatile boolean isMonitoringStarted = false;
+  /* Tracks the last scale direction applied, or empty if none. */
+  private volatile String lastScaleDirection = EMPTY_STRING;
+  /* Maximum CPU utilization observed during the monitoring interval. */
+  private volatile double maxJvmCpuUtilization = 0.0;
+  /** High memory usage threshold used to trigger thread pool downscaling. */
+  private final double highMemoryThreshold;
+  /** Low memory usage threshold used to allow thread pool upscaling. */
+  private final double lowMemoryThreshold;
 
   /**
    * Private constructor to initialize the write thread pool and CPU monitor executor
@@ -89,16 +103,17 @@ public final class WriteThreadPoolSizeManager implements Closeable {
    *
    * @param filesystemName       Name of the ABFS filesystem.
    * @param abfsConfiguration    Configuration containing pool size parameters.
+   * @param abfsCounters                  ABFS counters instance used for metrics.
    */
   private WriteThreadPoolSizeManager(String filesystemName,
-      AbfsConfiguration abfsConfiguration) {
+      AbfsConfiguration abfsConfiguration, AbfsCounters abfsCounters) {
+    /* Retrieves and assigns the write thread pool metrics from the ABFS client counters. */
+    this.writeThreadPoolMetrics = abfsCounters.getAbfsWriteResourceUtilizationMetrics();
     this.filesystemName = filesystemName;
     this.abfsConfiguration = abfsConfiguration;
     int availableProcessors = Runtime.getRuntime().availableProcessors();
-    /* Get the heap space available when the instance is created */
-    this.initialAvailableHeapMemory = getAvailableHeapMemory();
     /* Compute the max pool size */
-    int computedMaxPoolSize = getComputedMaxPoolSize(availableProcessors, initialAvailableHeapMemory);
+    int computedMaxPoolSize = getComputedMaxPoolSize(availableProcessors, ResourceUtilizationUtils.getAvailableMaxHeapMemory());
 
     /* Get the initial pool size from config, fallback to at least 1 */
     this.initialPoolSize = Math.max(1,
@@ -116,11 +131,13 @@ private WriteThreadPoolSizeManager(String filesystemName,
         }
     );
     ThreadPoolExecutor executor = (ThreadPoolExecutor) this.boundedThreadPool;
-    executor.setKeepAliveTime(
-        abfsConfiguration.getWriteThreadPoolKeepAliveTime(), TimeUnit.SECONDS);
+    int keepAlive = Math.max(1, abfsConfiguration.getWriteThreadPoolKeepAliveTime());
+    executor.setKeepAliveTime(keepAlive, TimeUnit.SECONDS);
     executor.allowCoreThreadTimeOut(true);
     /* Create a scheduled executor for CPU monitoring and pool adjustment */
     this.cpuMonitorExecutor = Executors.newScheduledThreadPool(1);
+    highMemoryThreshold = abfsConfiguration.getWriteHighMemoryUsageThresholdPercent() / HUNDRED_D;
+    lowMemoryThreshold = abfsConfiguration.getWriteLowMemoryUsageThresholdPercent() / HUNDRED_D;
   }
 
   /** Returns the internal {@link AbfsConfiguration}. */
@@ -146,23 +163,13 @@ private int getComputedMaxPoolSize(final int availableProcessors, long initialAv
   }
 
   /**
-   * Calculates the available heap memory in gigabytes.
-   * This method uses {@link Runtime#getRuntime()} to obtain the maximum heap memory
-   * allowed for the JVM and subtracts the currently used memory (total - free)
-   * to determine how much heap memory is still available.
-   * The result is rounded up to the nearest gigabyte.
+   * Determines the maximum thread count based on available heap memory and CPU cores.
+   * Calculates the thread count as {@code availableProcessors × multiplier}, where the
+   * multiplier is selected according to the heap memory tier (low, medium, or high).
    *
-   * @return the available heap memory in gigabytes
-   */
-  private long getAvailableHeapMemory() {
-    MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
-    MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
-    long availableHeapBytes = memoryUsage.getMax() - memoryUsage.getUsed();
-    return (availableHeapBytes + BYTES_PER_GIGABYTE - 1) / BYTES_PER_GIGABYTE;
-  }
-
-  /**
-   * Returns aggressive thread count = CPU cores × multiplier based on heap tier.
+   * @param availableHeapGB       the available heap memory in gigabytes.
+   * @param availableProcessors   the number of available CPU cores.
+   * @return the maximum thread count based on memory tier and processor count.
    */
   private int getMemoryTierMaxThreads(long availableHeapGB, int availableProcessors) {
     int multiplier;
@@ -177,15 +184,17 @@ private int getMemoryTierMaxThreads(long availableHeapGB, int availableProcessor
   }
 
   /**
-   * Returns the singleton instance of WriteThreadPoolSizeManager for the given filesystem.
+   * Returns the singleton {@link WriteThreadPoolSizeManager} instance for the specified filesystem.
+   * If an active instance already exists in the manager map for the given filesystem, it is returned.
+   * Otherwise, a new instance is created, registered in the map, and returned.
    *
-   * @param filesystemName the name of the filesystem.
-   * @param abfsConfiguration the configuration for the ABFS.
-   *
-   * @return the singleton instance.
+   * @param filesystemName     the name of the filesystem.
+   * @param abfsConfiguration  the {@link AbfsConfiguration} associated with the filesystem.
+   * @param abfsCounters                the {@link AbfsCounters} used to initialize the manager.
+   * @return  the singleton {@link WriteThreadPoolSizeManager} instance for the given filesystem.
    */
   public static synchronized WriteThreadPoolSizeManager getInstance(
-      String filesystemName, AbfsConfiguration abfsConfiguration) {
+      String filesystemName, AbfsConfiguration abfsConfiguration, AbfsCounters abfsCounters) {
     /* Check if an instance already exists in the map for the given filesystem */
     WriteThreadPoolSizeManager existingInstance = POOL_SIZE_MANAGER_MAP.get(
         filesystemName);
@@ -201,7 +210,7 @@ public static synchronized WriteThreadPoolSizeManager getInstance(
         "Creating new WriteThreadPoolSizeManager instance for filesystem: {}",
         filesystemName);
     WriteThreadPoolSizeManager newInstance = new WriteThreadPoolSizeManager(
-        filesystemName, abfsConfiguration);
+        filesystemName, abfsConfiguration, abfsCounters);
     POOL_SIZE_MANAGER_MAP.put(filesystemName, newInstance);
     return newInstance;
   }
@@ -232,34 +241,22 @@ private void adjustThreadPoolSize(int newMaxPoolSize) {
   /**
    * Starts monitoring the CPU utilization and adjusts the thread pool size accordingly.
    */
-  synchronized void startCPUMonitoring() {
-    cpuMonitorExecutor.scheduleAtFixedRate(() -> {
-      double cpuUtilization = getCpuUtilization();
-      LOG.debug("Current CPU Utilization is this: {}", cpuUtilization);
-      try {
-        adjustThreadPoolSizeBasedOnCPU(cpuUtilization);
-      } catch (InterruptedException e) {
-        throw new RuntimeException(String.format(
-            "Thread pool size adjustment interrupted for filesystem %s",
-            filesystemName), e);
-      }
-    }, 0, getAbfsConfiguration().getWriteCpuMonitoringInterval(), TimeUnit.MILLISECONDS);
-  }
-
-  /**
-   * Gets the current system CPU utilization.
-   *
-   * @return the CPU utilization as a fraction (0.0 to 1.0), or 0.0 if unavailable.
-   */
-  private double getCpuUtilization() {
-    OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(
-        OperatingSystemMXBean.class);
-    double cpuLoad = osBean.getSystemCpuLoad();
-    if (cpuLoad < 0) {
-      LOG.warn("System CPU load value unavailable (returned -1.0). Defaulting to 0.0.");
-      return 0.0;
+  public synchronized void startCPUMonitoring() {
+    if (!isMonitoringStarted()) {
+      isMonitoringStarted = true;
+      cpuMonitorExecutor.scheduleAtFixedRate(() -> {
+            double cpuUtilization = ResourceUtilizationUtils.getJvmCpuLoad();
+            LOG.debug("Current CPU Utilization is this: {}", cpuUtilization);
+            try {
+              adjustThreadPoolSizeBasedOnCPU(cpuUtilization);
+            } catch (InterruptedException e) {
+              throw new RuntimeException(String.format(
+                  "Thread pool size adjustment interrupted for filesystem %s",
+                  filesystemName), e);
+            }
+          }, 0, getAbfsConfiguration().getWriteCpuMonitoringInterval(),
+          TimeUnit.MILLISECONDS);
     }
-    return cpuLoad;
   }
 
   /**
@@ -267,31 +264,68 @@ private double getCpuUtilization() {
    * and available heap memory relative to the initially available heap.
    *
    * @param cpuUtilization Current system CPU utilization (0.0 to 1.0)
-   * @throws InterruptedException if thread locking is interrupted
+   *  @throws InterruptedException if the resizing operation is interrupted while acquiring the lock
    */
   public void adjustThreadPoolSizeBasedOnCPU(double cpuUtilization) throws InterruptedException {
     lock.lock();
     try {
       ThreadPoolExecutor executor = (ThreadPoolExecutor) this.boundedThreadPool;
       int currentPoolSize = executor.getMaximumPoolSize();
-      long currentHeap = getAvailableHeapMemory();
-      long initialHeap = initialAvailableHeapMemory;
-      LOG.debug("Available heap memory: {} GB, Initial heap memory: {} GB", currentHeap, initialHeap);
-      LOG.debug("Current CPU Utilization: {}", cpuUtilization);
-
+      double memoryLoad = ResourceUtilizationUtils.getMemoryLoad();
+      LOG.debug("The memory load is {} and CPU utilization is {}", memoryLoad, cpuUtilization);
       if (cpuUtilization > (abfsConfiguration.getWriteHighCpuThreshold()/HUNDRED_D)) {
-        newMaxPoolSize = calculateReducedPoolSizeHighCPU(currentPoolSize, currentHeap, initialHeap);
+        newMaxPoolSize = calculateReducedPoolSizeHighCPU(currentPoolSize, memoryLoad);
+        if (currentPoolSize == initialPoolSize && newMaxPoolSize == initialPoolSize) {
+          lastScaleDirection = SCALE_DIRECTION_NO_DOWN_AT_MIN;
+        }
       } else if (cpuUtilization > (abfsConfiguration.getWriteMediumCpuThreshold()/HUNDRED_D)) {
-        newMaxPoolSize = calculateReducedPoolSizeMediumCPU(currentPoolSize, currentHeap, initialHeap);
+        newMaxPoolSize = calculateReducedPoolSizeMediumCPU(currentPoolSize, memoryLoad);
+        if (currentPoolSize == initialPoolSize && newMaxPoolSize == initialPoolSize) {
+          lastScaleDirection = SCALE_DIRECTION_NO_DOWN_AT_MIN;
+        }
       } else if (cpuUtilization < (abfsConfiguration.getWriteLowCpuThreshold()/HUNDRED_D)) {
-        newMaxPoolSize = calculateIncreasedPoolSizeLowCPU(currentPoolSize, currentHeap, initialHeap);
+        newMaxPoolSize = calculateIncreasedPoolSizeLowCPU(currentPoolSize, memoryLoad);
+        if (currentPoolSize == maxThreadPoolSize && newMaxPoolSize == maxThreadPoolSize) {
+          lastScaleDirection = SCALE_DIRECTION_NO_UP_AT_MAX;
+        }
       } else {
         newMaxPoolSize = currentPoolSize;
         LOG.debug("CPU load normal ({}). No change: current={}", cpuUtilization, currentPoolSize);
       }
-      if (newMaxPoolSize != currentPoolSize) {
+      boolean willResize = newMaxPoolSize != currentPoolSize;
+      if (!willResize && !lastScaleDirection.equals(EMPTY_STRING)) {
+        WriteThreadPoolStats stats = getCurrentStats(cpuUtilization, memoryLoad);
+        // Update the write thread pool metrics with the latest statistics snapshot.
+        writeThreadPoolMetrics.update(stats);
+      }
+      // Case 1: CPU increased — push metrics ONLY if not resizing
+      if (cpuUtilization > maxJvmCpuUtilization) {
+        maxJvmCpuUtilization = cpuUtilization;
+        if (!willResize) {
+          try {
+            // Capture the latest thread pool statistics (pool size, CPU, memory, etc.).
+            WriteThreadPoolStats stats = getCurrentStats(cpuUtilization, memoryLoad);
+            // Update the write thread pool metrics with the latest statistics snapshot.
+            writeThreadPoolMetrics.update(stats);
+          } catch (Exception e) {
+            LOG.debug("Error updating write thread pool metrics", e);
+          }
+        }
+      }
+      // Case 2: Resize — always push metrics
+      if (willResize) {
         LOG.debug("Resizing thread pool from {} to {}", currentPoolSize, newMaxPoolSize);
+        // Record scale direction
+        lastScaleDirection = (newMaxPoolSize > currentPoolSize) ? SCALE_DIRECTION_UP: SCALE_DIRECTION_DOWN;
         adjustThreadPoolSize(newMaxPoolSize);
+        try {
+          // Capture the latest thread pool statistics (pool size, CPU, memory, etc.).
+          WriteThreadPoolStats stats = getCurrentStats(cpuUtilization, memoryLoad);
+          // Update the write thread pool metrics with the latest statistics snapshot.
+          writeThreadPoolMetrics.update(stats);
+        } catch (Exception e) {
+          LOG.debug("Error updating write thread pool metrics after resizing.", e);
+        }
       }
     } finally {
       lock.unlock();
@@ -299,12 +333,20 @@ public void adjustThreadPoolSizeBasedOnCPU(double cpuUtilization) throws Interru
   }
 
   /**
-   * Calculates reduced pool size under high CPU utilization.
+   * Calculates a reduced thread pool size when high CPU utilization is detected.
+   * The reduction strategy depends on available heap memory:
+   * if heap usage is high (low free memory), the pool size is reduced aggressively;
+   * otherwise, it is reduced moderately to prevent resource contention.
+   *
+   * @param currentPoolSize  the current size of the thread pool.
+   *  @param memoryLoad      the current JVM heap load (0.0–1.0)
+   * @return the adjusted (reduced) pool size based on CPU and memory conditions.
    */
-  private int calculateReducedPoolSizeHighCPU(int currentPoolSize, long currentHeap, long initialHeap) {
-    if (currentHeap <= initialHeap / HIGH_MEDIUM_HEAP_FACTOR) {
-      LOG.debug("High CPU & low heap. Aggressively reducing: current={}, new={}",
-          currentPoolSize, currentPoolSize / HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR);
+  private int calculateReducedPoolSizeHighCPU(int currentPoolSize, double memoryLoad) {
+    LOG.debug("The high cpu memory load is {}", memoryLoad);
+    if (memoryLoad > highMemoryThreshold) {
+      LOG.debug("High CPU & high memory load ({}). Aggressive reduction: current={}, new={}",
+          memoryLoad, currentPoolSize, currentPoolSize / HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR);
       return Math.max(initialPoolSize, currentPoolSize / HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR);
     }
     int reduced = Math.max(initialPoolSize, currentPoolSize - currentPoolSize / HIGH_CPU_REDUCTION_FACTOR);
@@ -314,12 +356,21 @@ private int calculateReducedPoolSizeHighCPU(int currentPoolSize, long currentHea
   }
 
   /**
-   * Calculates reduced pool size under medium CPU utilization.
+   * Calculates a reduced thread pool size when medium CPU utilization is detected.
+   * The reduction is based on available heap memory: if memory is low, the pool size
+   * is reduced more aggressively; otherwise, a moderate reduction is applied to
+   * maintain balanced performance.
+   *
+   * @param currentPoolSize  the current size of the thread pool.
+   * @param memoryLoad      the current JVM heap load (0.0–1.0)
+   * @return the adjusted (reduced) pool size based on medium CPU and memory conditions.
    */
-  private int calculateReducedPoolSizeMediumCPU(int currentPoolSize, long currentHeap, long initialHeap) {
-    if (currentHeap <= initialHeap / HIGH_MEDIUM_HEAP_FACTOR) {
+  private int calculateReducedPoolSizeMediumCPU(int currentPoolSize, double memoryLoad) {
+    LOG.debug("The medium cpu memory load is {}", memoryLoad);
+    if (memoryLoad > highMemoryThreshold) {
       int reduced = Math.max(initialPoolSize, currentPoolSize - currentPoolSize / MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR);
-      LOG.debug("Medium CPU & low heap. Reducing: current={}, new={}", currentPoolSize, reduced);
+      LOG.debug("Medium CPU & high memory load ({}). Reducing: current={}, new={}",
+          memoryLoad, currentPoolSize, reduced);
       return reduced;
     }
     int reduced = Math.max(initialPoolSize, currentPoolSize - currentPoolSize / MEDIUM_CPU_REDUCTION_FACTOR);
@@ -329,22 +380,29 @@ private int calculateReducedPoolSizeMediumCPU(int currentPoolSize, long currentH
   }
 
   /**
-   * Calculates increased pool size under low CPU utilization.
+   * Calculates an adjusted thread pool size when low CPU utilization is detected.
+   * If sufficient heap memory is available, the pool size is increased to improve throughput.
+   * Otherwise, it is slightly decreased to conserve memory resources.
+   *
+   * @param currentPoolSize  the current size of the thread pool.
+   * @param memoryLoad      the current JVM heap load (0.0–1.0)
+   * @return the adjusted (increased or decreased) pool size based on CPU and memory conditions.
    */
-  private int calculateIncreasedPoolSizeLowCPU(int currentPoolSize, long currentHeap, long initialHeap) {
-    if (currentHeap >= initialHeap * LOW_CPU_HEAP_FACTOR) {
+  private int calculateIncreasedPoolSizeLowCPU(int currentPoolSize, double memoryLoad) {
+    LOG.debug("The low cpu memory load is {}", memoryLoad);
+    if (memoryLoad <= lowMemoryThreshold) {
       int increased = Math.min(maxThreadPoolSize, (int) (currentPoolSize * LOW_CPU_POOL_SIZE_INCREASE_FACTOR));
-      LOG.debug("Low CPU & healthy heap. Increasing: current={}, new={}", currentPoolSize, increased);
+      LOG.debug("Low CPU & low memory load ({}). Increasing: current={}, new={}",
+          memoryLoad, currentPoolSize, increased);
       return increased;
     } else {
       // Decrease by 10%
       int decreased = Math.max(1, (int) (currentPoolSize * LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR));
-      LOG.debug("Low CPU but insufficient heap ({} GB). Decreasing: current={}, new={}", currentHeap, currentPoolSize, decreased);
+      LOG.debug("Low CPU but insufficient heap. Decreasing: current={}, new={}", currentPoolSize, decreased);
       return decreased;
     }
   }
 
-
   /**
    * Returns the executor service for the thread pool.
    *
@@ -364,6 +422,26 @@ public ScheduledExecutorService getCpuMonitorExecutor() {
   }
 
   /**
+   * Checks if monitoring has started.
+   *
+   * @return true if monitoring has started, false otherwise.
+   */
+  public synchronized boolean isMonitoringStarted() {
+    return isMonitoringStarted;
+  }
+
+  /**
+   * Returns the maximum JVM CPU utilization observed during the current
+   * monitoring interval or since the last reset.
+   *
+   * @return the highest JVM CPU utilization percentage recorded
+   */
+  @VisibleForTesting
+  public double getMaxJvmCpuUtilization() {
+    return maxJvmCpuUtilization;
+  }
+
+  /**
    * Closes this manager by shutting down executors and cleaning up resources.
    * Removes the instance from the active manager map.
    *
@@ -394,4 +472,87 @@ public void close() throws IOException {
       }
     }
   }
+
+  /**
+   * Represents current statistics of the write thread pool and system.
+   */
+  public static class WriteThreadPoolStats extends ResourceUtilizationStats {
+
+    /**
+     * Constructs a {@link WriteThreadPoolStats} instance containing thread pool
+     * metrics and JVM/system resource utilization details.
+     *
+     * @param currentPoolSize the current number of threads in the pool
+     * @param maxPoolSize the maximum number of threads permitted in the pool
+     * @param activeThreads the number of threads actively executing tasks
+     * @param idleThreads the number of idle threads in the pool
+     * @param jvmCpuLoad the current JVM CPU load (0.0–1.0)
+     * @param systemCpuUtilization the current system-wide CPU utilization (0.0–1.0)
+     * @param availableHeapGB the available heap memory in gigabytes
+     * @param committedHeapGB the committed heap memory in gigabytes
+     * @param usedHeapGB the available heap memory in gigabytes
+     * @param maxHeapGB the committed heap memory in gigabytes
+     * @param memoryLoad the JVM memory load (used / max)
+     * @param lastScaleDirection the last scaling action performed: "I" (increase),
+     * "D" (decrease), or empty if no scaling occurred
+     * @param maxCpuUtilization the peak JVM CPU utilization observed during this interval
+     * @param jvmProcessId the process ID of the JVM
+     */
+    public WriteThreadPoolStats(int currentPoolSize,
+        int maxPoolSize, int activeThreads, int idleThreads,
+        double jvmCpuLoad, double systemCpuUtilization, double availableHeapGB,
+        double committedHeapGB, double usedHeapGB, double maxHeapGB, double memoryLoad, String lastScaleDirection,
+        double maxCpuUtilization, long jvmProcessId) {
+      super(currentPoolSize, maxPoolSize, activeThreads, idleThreads,
+          jvmCpuLoad, systemCpuUtilization, availableHeapGB,
+          committedHeapGB, usedHeapGB, maxHeapGB, memoryLoad, lastScaleDirection,
+          maxCpuUtilization, jvmProcessId);
+    }
+  }
+
+  /**
+   * Returns the latest statistics for the write thread pool and system resources.
+   * The snapshot includes thread counts, JVM and system CPU utilization, and the
+   * current heap usage. These metrics are used for monitoring and making dynamic
+   * sizing decisions for the write thread pool.
+   *
+   * @param jvmCpuUtilization current JVM CPU usage (%)
+   * @param memoryLoad        current JVM memory load (used/committed)
+   * @return a {@link WriteThreadPoolStats} object containing the current metrics
+   */
+  synchronized WriteThreadPoolStats getCurrentStats(double jvmCpuUtilization,
+      double memoryLoad) {
+
+    if (boundedThreadPool == null) {
+      return new WriteThreadPoolStats(
+          ZERO, ZERO, ZERO, ZERO, ZERO_D, ZERO_D, ZERO_D, ZERO_D, ZERO_D,
+          ZERO_D, ZERO_D, EMPTY_STRING, ZERO_D, ZERO);
+    }
+
+    ThreadPoolExecutor exec = (ThreadPoolExecutor) this.boundedThreadPool;
+
+    String currentScaleDirection = lastScaleDirection;
+    lastScaleDirection = EMPTY_STRING;
+
+    int poolSize = exec.getPoolSize();
+    int activeThreads = exec.getActiveCount();
+    int idleThreads = poolSize - activeThreads;
+
+    return new WriteThreadPoolStats(
+        poolSize,                      // Current thread count
+        exec.getMaximumPoolSize(),     // Max allowed threads
+        activeThreads,                 // Busy threads
+        idleThreads,                   // Idle threads
+        jvmCpuUtilization,             // JVM CPU usage (ratio)
+        ResourceUtilizationUtils.getSystemCpuLoad(),     // System CPU usage (ratio)
+        ResourceUtilizationUtils.getAvailableHeapMemory(),      // Free heap (GB)
+        ResourceUtilizationUtils.getCommittedHeapMemory(),      // Committed heap (GB)
+        ResourceUtilizationUtils.getUsedHeapMemory(),   // Used heap (GB)
+        ResourceUtilizationUtils.getMaxHeapMemory(),    // Max heap (GB)
+        memoryLoad,                    // used/max
+        currentScaleDirection,         // "I", "D", or ""
+        getMaxJvmCpuUtilization(),              // Peak JVM CPU usage so far
+        ResourceUtilizationUtils.getJvmProcessId()              // JVM PID
+    );
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 91fc97e..3de55ad 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -537,6 +537,16 @@ public static String containerProperty(String property, String fsName, String ac
   /** Configuration key for the high-tier memory multiplier for write workloads. Value: {@value}. */
   public static final String FS_AZURE_WRITE_HIGH_TIER_MEMORY_MULTIPLIER = "fs.azure.write.high.tier.memory.multiplier";
 
+  /**
+   * Threshold percentage for high memory usage to scale up/down the buffer pool size in write code.
+   */
+  public static final String FS_AZURE_WRITE_HIGH_MEMORY_USAGE_THRESHOLD_PERCENT = "fs.azure.write.high.memory.usage.threshold.percent";
+
+  /**
+   * Threshold percentage for low memory usage to scale up/down the buffer pool size in write code.
+   */
+  public static final String FS_AZURE_WRITE_LOW_MEMORY_USAGE_THRESHOLD_PERCENT = "fs.azure.write.low.memory.usage.threshold.percent";
+
   /**Flag to enable/disable sending client transactional ID during create/rename operations: {@value}*/
   public static final String FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = "fs.azure.enable.client.transaction.id";
   /**Flag to enable/disable create idempotency during create operation: {@value}*/
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 8c5bc22..1be9eca 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -245,6 +245,28 @@ public final class FileSystemConfigurations {
   public static final int HUNDRED = 100;
   public static final double HUNDRED_D = 100.0;
   public static final long THOUSAND = 1000L;
+  // Indicates a successful scale-up operation
+  public static final int SCALE_UP = 1;
+  // Indicates a successful scale-down operation
+  public static final int SCALE_DOWN = -1;
+  // Indicates a down-scale was requested but already at minimum
+  public static final int NO_SCALE_DOWN_AT_MIN = -2;
+  // Indicates an up-scale was requested but already at maximum
+  public static final int NO_SCALE_UP_AT_MAX = 2;
+  // Indicates no scaling action was taken
+  public static final int SCALE_NONE = 0;
+  // Indicates no action is needed based on current metrics
+  public static final int NO_ACTION_NEEDED = 3;
+  // Indicates a successful scale-up operation
+  public static final String SCALE_DIRECTION_UP = "I";
+  // Indicates a successful scale-down operation
+  public static final String SCALE_DIRECTION_DOWN = "D";
+  // Indicates a down-scale was requested but pool is already at minimum
+  public static final String SCALE_DIRECTION_NO_DOWN_AT_MIN = "-D";
+  // Indicates an up-scale was requested but pool is already at maximum
+  public static final String SCALE_DIRECTION_NO_UP_AT_MAX = "+F";
+  // Indicates no scaling action is needed based on current metrics
+  public static final String SCALE_DIRECTION_NO_ACTION_NEEDED = "NA";
 
   public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY
       = HttpOperationType.APACHE_HTTP_CLIENT;
@@ -345,11 +367,6 @@ public final class FileSystemConfigurations {
   public static final int DEFAULT_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT = 60;
 
   /**
-   * Minimum CPU utilization percentage considered as low threshold for write scaling.
-   */
-  public static final int MIN_WRITE_LOW_CPU_THRESHOLD_PERCENT = 10;
-
-  /**
    * Maximum CPU utilization percentage considered as low threshold for write scaling.
    */
   public static final int MAX_WRITE_LOW_CPU_THRESHOLD_PERCENT = 40;
@@ -389,6 +406,12 @@ public final class FileSystemConfigurations {
    */
   public static final int DEFAULT_WRITE_HIGH_TIER_MEMORY_MULTIPLIER = 16;
 
+  /** Percentage threshold of heap usage at which memory pressure is considered high. */
+  public static final int DEFAULT_WRITE_HIGH_MEMORY_USAGE_THRESHOLD_PERCENT = 60;
+
+  /** Percentage threshold of heap usage at which memory pressure is considered low. */
+  public static final int DEFAULT_WRITE_LOW_MEMORY_USAGE_THRESHOLD_PERCENT = 30;
+
   public static final boolean DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = true;
 
   public static final boolean DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY = true;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsReadResourceUtilizationMetricsEnum.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsReadResourceUtilizationMetricsEnum.java
new file mode 100644
index 0000000..a9d871b
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsReadResourceUtilizationMetricsEnum.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.enums;
+
+/**
+ * Enum representing the set of metrics tracked for the ABFS read thread pool.
+ * Each metric includes a short name used for reporting and its corresponding
+ * {@link StatisticTypeEnum}, which defines how the metric is measured (e.g., gauge).
+ */
+public enum AbfsReadResourceUtilizationMetricsEnum implements
+    AbfsResourceUtilizationMetricsEnum {
+
+  /** Current number of threads in the read thread pool. */
+  CURRENT_POOL_SIZE("CP", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** Maximum configured size of the read thread pool. */
+  MAX_POOL_SIZE("MP", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** Number of threads currently executing read operations. */
+  ACTIVE_THREADS("AT", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** Number of threads currently idle. */
+  IDLE_THREADS("IT", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** Recent JVM CPU load value as reported by the JVM (0.0 to 1.0). */
+  JVM_CPU_UTILIZATION("JC", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** Overall system-wide CPU utilization percentage during read operations. */
+  SYSTEM_CPU_UTILIZATION("SC", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** Available heap memory (in GB) measured during read operations. */
+  AVAILABLE_MEMORY("AM", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** Committed heap memory (in GB) measured during read operations. */
+  COMMITTED_MEMORY("CM", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** Used heap memory (in GB) measured during read operations. */
+  USED_MEMORY("UM", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** Maximum heap memory (in GB) measured during read operations. */
+  MAX_HEAP_MEMORY("MM", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** Available heap memory (in GB) measured during read operations. */
+  MEMORY_LOAD("ML", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** Direction of the last scaling decision (e.g., scale-up or scale-down). */
+  LAST_SCALE_DIRECTION("SD", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** Maximum CPU utilization recorded during the monitoring interval. */
+  MAX_CPU_UTILIZATION("MC", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** The process ID (PID) of the running JVM, useful for correlating metrics with system-level process information. */
+  JVM_PROCESS_ID("JI", StatisticTypeEnum.TYPE_GAUGE);
+
+  private final String name;
+  private final StatisticTypeEnum statisticType;
+
+  /**
+   * Constructs a metric enum constant with its short name and type.
+   *
+   * @param name  the short name or label for the metric.
+   * @param type  the {@link StatisticTypeEnum} indicating the metric type.
+   */
+  AbfsReadResourceUtilizationMetricsEnum(String name, StatisticTypeEnum type) {
+    this.name = name;
+    this.statisticType = type;
+  }
+
+  /**
+   * Returns the short name of the metric.
+   *
+   * @return the metric name.
+   */
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * Returns the {@link StatisticTypeEnum} associated with this metric.
+   *
+   * @return the metric's statistic type.
+   */
+  @Override
+  public StatisticTypeEnum getStatisticType() {
+    return statisticType;
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsResourceUtilizationMetricsEnum.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsResourceUtilizationMetricsEnum.java
new file mode 100644
index 0000000..c4ffaaf
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsResourceUtilizationMetricsEnum.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.enums;
+
+/**
+ * Defines the contract for all ABFS resource-level metric keys.
+ * <p>
+ * Metric enums implementing this interface supply the metric name and its
+ * {@link StatisticTypeEnum} (e.g., gauge or counter), allowing consistent
+ * registration and updates across ABFS metric sources.
+ * </p>
+ */
+public interface AbfsResourceUtilizationMetricsEnum {
+
+  /**
+   * Returns the unique metric name used for registration and reporting.
+   *
+   * @return the metric name
+   */
+  String getName();
+
+  /**
+   * Returns the statistic type associated with this metric
+   * (gauge, counter, etc.).
+   *
+   * @return the metric's {@link StatisticTypeEnum}
+   */
+  StatisticTypeEnum getStatisticType();
+}
+
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsWriteResourceUtilizationMetricsEnum.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsWriteResourceUtilizationMetricsEnum.java
new file mode 100644
index 0000000..1f92626
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsWriteResourceUtilizationMetricsEnum.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.enums;
+
+/**
+ * Enum representing the set of metrics tracked for the ABFS write thread pool.
+ * Each metric entry defines a short name identifier and its corresponding
+ * {@link StatisticTypeEnum}, which specifies the type of measurement (e.g., gauge).
+ * These metrics are used for monitoring and analyzing the performance and
+ * resource utilization of the write thread pool.
+ */
+public enum AbfsWriteResourceUtilizationMetricsEnum implements
+    AbfsResourceUtilizationMetricsEnum {
+
+  /** Current number of threads in the write thread pool. */
+  CURRENT_POOL_SIZE("CP", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** Maximum configured size of the write thread pool. */
+  MAX_POOL_SIZE("MP", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** Number of threads currently executing write operations. */
+  ACTIVE_THREADS("AT", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** Number of threads currently idle. */
+  IDLE_THREADS("IT", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** Recent JVM CPU load value as reported by the JVM (0.0 to 1.0). */
+  JVM_CPU_UTILIZATION("JC", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** Overall system-wide CPU utilization percentage during write operations. */
+  SYSTEM_CPU_UTILIZATION("SC", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** Available heap memory (in GB) measured during write operations. */
+  AVAILABLE_MEMORY("AM", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** Committed heap memory (in GB) measured during write operations. */
+  COMMITTED_MEMORY("CM", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** Used heap memory (in GB) measured during write operations. */
+  USED_MEMORY("UM", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** Maximum heap memory (in GB) measured during write operations. */
+  MAX_HEAP_MEMORY("MM", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** Available heap memory (in GB) measured during write operations. */
+  MEMORY_LOAD("ML", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** Direction of the last scaling decision (e.g., scale-up or scale-down). */
+  LAST_SCALE_DIRECTION("SD", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** Maximum CPU utilization recorded during the monitoring interval. */
+  MAX_CPU_UTILIZATION("MC", StatisticTypeEnum.TYPE_GAUGE),
+
+  /** The process ID (PID) of the running JVM, useful for correlating metrics with system-level process information. */
+  JVM_PROCESS_ID("JI", StatisticTypeEnum.TYPE_GAUGE);
+
+  private final String name;
+  private final StatisticTypeEnum statisticType;
+
+  /**
+   * Constructs a metric definition for the ABFS write thread pool.
+   *
+   * @param name  the short name identifier for the metric.
+   * @param type  the {@link StatisticTypeEnum} describing the metric type.
+   */
+  AbfsWriteResourceUtilizationMetricsEnum(String name, StatisticTypeEnum type) {
+    this.name = name;
+    this.statisticType = type;
+  }
+
+  /**
+   * Returns the short name identifier of the metric.
+   *
+   * @return the metric name.
+   */
+  @Override
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * Returns the {@link StatisticTypeEnum} associated with this metric.
+   *
+   * @return the metric's statistic type.
+   */
+  @Override
+  public StatisticTypeEnum getStatisticType() {
+    return statisticType;
+  }
+}
+
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
index 0210dd5..43171ae 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
@@ -1334,7 +1334,12 @@ public AbfsRestOperation read(final String path,
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.READ_OPERATION,
         abfsUriQueryBuilder, cachedSasToken);
-
+    // Retrieve the read thread pool metrics from the ABFS counters.
+    AbfsReadResourceUtilizationMetrics readResourceUtilizationMetrics = retrieveReadResourceUtilizationMetrics();
+    // If metrics are available, record them in the tracing context for diagnostics or logging.
+    if (readResourceUtilizationMetrics != null) {
+      tracingContext.setResourceUtilizationMetricResults(readResourceUtilizationMetrics.toString());
+    }
     URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = getAbfsRestOperation(
         AbfsRestOperationType.GetBlob,
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 74bddff..18e8183 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -312,8 +312,15 @@ private AbfsClient(final URL baseUrl,
           metricIdlePeriod,
           metricIdlePeriod);
     }
+    // Initialize write thread pool metrics if dynamic write thread pool scaling is enabled.
+    if (abfsConfiguration.isDynamicWriteThreadPoolEnablement()) {
+      abfsCounters.initializeWriteResourceUtilizationMetrics();
+    }
     this.abfsMetricUrl = abfsConfiguration.getMetricUri();
-
+    // Initialize read thread pool metrics if ReadAheadV2 and its dynamic scaling feature are enabled.
+    if (abfsConfiguration.isReadAheadV2Enabled() && abfsConfiguration.isReadAheadV2DynamicScalingEnabled()) {
+      abfsCounters.initializeReadResourceUtilizationMetrics();
+    }
     final Class<? extends IdentityTransformerInterface> identityTransformerClass =
         abfsConfiguration.getRawConfiguration().getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class,
             IdentityTransformerInterface.class);
@@ -1881,6 +1888,16 @@ protected AbfsRestOperation getSuccessOp(final AbfsRestOperationType operationTy
   }
 
   /**
+   * Retrieves the current read thread pool metrics from the ABFS counters.
+   *
+   * @return an {@link AbfsReadResourceUtilizationMetrics} instance containing
+   *         the latest statistics for the read thread pool
+   */
+  protected AbfsReadResourceUtilizationMetrics retrieveReadResourceUtilizationMetrics() {
+    return getAbfsCounters().getAbfsReadResourceUtilizationMetrics();
+  }
+
+  /**
    * Creates a VersionedFileStatus object from the ListResultEntrySchema.
    * @param entry ListResultEntrySchema object.
    * @param uri to be used for the path conversion.
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java
index 0a94b66..4512db98f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java
@@ -83,5 +83,14 @@ String formString(String prefix, String separator, String suffix,
 
   AbfsReadFooterMetrics getAbfsReadFooterMetrics();
 
+  void initializeReadResourceUtilizationMetrics();
+
+  AbfsReadResourceUtilizationMetrics getAbfsReadResourceUtilizationMetrics();
+
+  void initializeWriteResourceUtilizationMetrics();
+
+  AbfsWriteResourceUtilizationMetrics getAbfsWriteResourceUtilizationMetrics();
+
   AtomicLong getLastExecutionTime();
+
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
index 3a9244c..cf2449e 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
@@ -1061,7 +1061,12 @@ public AbfsRestOperation read(final String path,
     String sasTokenForReuse = appendSASTokenToQuery(path,
         SASTokenProvider.READ_OPERATION,
         abfsUriQueryBuilder, cachedSasToken);
-
+    // Retrieve the read thread pool metrics from the ABFS counters.
+    AbfsReadResourceUtilizationMetrics readResourceUtilizationMetrics = retrieveReadResourceUtilizationMetrics();
+    // If metrics are available, record them in the tracing context for diagnostics or logging.
+    if (readResourceUtilizationMetrics != null) {
+      tracingContext.setResourceUtilizationMetricResults(readResourceUtilizationMetrics.toString());
+    }
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = getAbfsRestOperation(
         AbfsRestOperationType.ReadFile,
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index 9d29614..31b6f0f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -186,7 +186,7 @@ public AbfsInputStream(
     if (readAheadV2Enabled) {
       ReadBufferManagerV2.setReadBufferManagerConfigs(
           readAheadBlockSize, client.getAbfsConfiguration());
-      readBufferManager = ReadBufferManagerV2.getBufferManager();
+      readBufferManager = ReadBufferManagerV2.getBufferManager(client.getAbfsCounters());
     } else {
       ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize);
       readBufferManager = ReadBufferManagerV1.getBufferManager();
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 3abf21c..6c68782 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -31,6 +31,7 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.hadoop.fs.azurebfs.WriteThreadPoolSizeManager;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidIngressServiceException;
@@ -167,6 +168,13 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
    */
   private MessageDigest fullBlobContentMd5 = null;
 
+  /**
+   * Instance of {@link WriteThreadPoolSizeManager} used by this class
+   * to dynamically adjust the write thread pool size based on
+   * system resource utilization.
+   */
+  private final WriteThreadPoolSizeManager writeThreadPoolSizeManager;
+
   public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
       throws IOException {
     this.statistics = abfsOutputStreamContext.getStatistics();
@@ -217,6 +225,9 @@ public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
     this.serviceTypeAtInit = abfsOutputStreamContext.getIngressServiceType();
     this.currentExecutingServiceType = abfsOutputStreamContext.getIngressServiceType();
     this.clientHandler = abfsOutputStreamContext.getClientHandler();
+    this.writeThreadPoolSizeManager = abfsOutputStreamContext.getWriteThreadPoolSizeManager();
+    // Initialize CPU monitoring if the pool size manager is present
+    initializeMonitoringIfNeeded();
     createIngressHandler(serviceTypeAtInit,
         abfsOutputStreamContext.getBlockFactory(), bufferSize, false, null);
     try {
@@ -243,6 +254,21 @@ public AzureIngressHandler getIngressHandler() {
   private volatile boolean switchCompleted = false;
 
   /**
+   * Starts CPU monitoring in the thread pool size manager if it
+   * is initialized and not already monitoring.
+   */
+  private void initializeMonitoringIfNeeded() {
+    if (writeThreadPoolSizeManager != null && !writeThreadPoolSizeManager.isMonitoringStarted()) {
+      synchronized (this) {
+        // Re-check to avoid a race between threads
+        if (!writeThreadPoolSizeManager.isMonitoringStarted()) {
+          writeThreadPoolSizeManager.startCPUMonitoring();
+        }
+      }
+    }
+  }
+
+  /**
    * Creates or retrieves an existing Azure ingress handler based on the service type and provided parameters.
    * <p>
    * If the `ingressHandler` is already initialized and the switch operation is complete, the existing
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
index ceae24b..68a2ba0 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
@@ -21,6 +21,7 @@
 import java.util.concurrent.ExecutorService;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.WriteThreadPoolSizeManager;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
 import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
@@ -79,6 +80,9 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
 
   private AbfsClientHandler clientHandler;
 
+  /** Reference to the thread pool manager. */
+  private WriteThreadPoolSizeManager writeThreadPoolSizeManager;
+
   public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
     super(sasTokenRenewPeriodForStreamsInSeconds);
   }
@@ -228,6 +232,12 @@ public AbfsOutputStreamContext withEncryptionAdapter(
     return this;
   }
 
+  public AbfsOutputStreamContext withWriteThreadPoolManager(
+      final WriteThreadPoolSizeManager writeThreadPoolSizeManager) {
+    this.writeThreadPoolSizeManager = writeThreadPoolSizeManager;
+    return this;
+  }
+
   public int getWriteBufferSize() {
     return writeBufferSize;
   }
@@ -328,6 +338,10 @@ public AbfsClientHandler getClientHandler() {
     return clientHandler;
   }
 
+  public WriteThreadPoolSizeManager getWriteThreadPoolSizeManager() {
+    return writeThreadPoolSizeManager;
+  }
+
   /**
    * Checks if small write is supported based on the current configuration.
    *
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadResourceUtilizationMetrics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadResourceUtilizationMetrics.java
new file mode 100644
index 0000000..a38dd08
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadResourceUtilizationMetrics.java
@@ -0,0 +1,88 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
+
+import org.apache.hadoop.fs.azurebfs.enums.AbfsReadResourceUtilizationMetricsEnum;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+
+/**
+ * Metrics container for the ABFS read thread pool.
+ * <p>
+ * This class captures thread-pool sizing, CPU utilization, memory usage,
+ * scaling direction, and other runtime statistics reported by
+ * {@link ReadBufferManagerV2.ReadThreadPoolStats}.
+ * </p>
+ */
+public class AbfsReadResourceUtilizationMetrics
+    extends
+    AbstractAbfsResourceUtilizationMetrics<AbfsReadResourceUtilizationMetricsEnum> {
+
+  /**
+   * Creates a metrics set for read operations, initializing all
+   * metric keys defined in {@link AbfsReadResourceUtilizationMetricsEnum}.
+   */
+  public AbfsReadResourceUtilizationMetrics() {
+    super(AbfsReadResourceUtilizationMetricsEnum.values(), FSOperationType.READ.toString());
+  }
+
+  /**
+   * Updates all read-thread-pool metrics using the latest stats snapshot.
+   * <p>
+   * Each value from {@link ReadBufferManagerV2.ReadThreadPoolStats} is
+   * mapped to the corresponding metric, including:
+   * </p>
+   * <ul>
+   *   <li>Thread pool size (current, max, active, idle)</li>
+   *   <li>JVM and system CPU load (converted to percentage)</li>
+   *   <li>Available and committed memory</li>
+   *   <li>Memory load percentage</li>
+   *   <li>Scaling direction</li>
+   *   <li>Maximum CPU utilization observed</li>
+   *   <li>JVM process ID</li>
+   * </ul>
+   *
+   * @param stats the latest read-thread-pool statistics; ignored if {@code null}
+   */
+  public synchronized void update(ReadBufferManagerV2.ReadThreadPoolStats stats) {
+    if (stats == null) {
+      return;
+    }
+
+    setMetricValue(AbfsReadResourceUtilizationMetricsEnum.CURRENT_POOL_SIZE, stats.getCurrentPoolSize());
+    setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MAX_POOL_SIZE, stats.getMaxPoolSize());
+    setMetricValue(AbfsReadResourceUtilizationMetricsEnum.ACTIVE_THREADS, stats.getActiveThreads());
+    setMetricValue(AbfsReadResourceUtilizationMetricsEnum.IDLE_THREADS, stats.getIdleThreads());
+    setMetricValue(AbfsReadResourceUtilizationMetricsEnum.JVM_CPU_UTILIZATION, stats.getJvmCpuLoad() * HUNDRED_D);
+    setMetricValue(AbfsReadResourceUtilizationMetricsEnum.SYSTEM_CPU_UTILIZATION, stats.getSystemCpuUtilization() * HUNDRED_D);
+    setMetricValue(AbfsReadResourceUtilizationMetricsEnum.AVAILABLE_MEMORY, stats.getMemoryUtilization());
+    setMetricValue(AbfsReadResourceUtilizationMetricsEnum.COMMITTED_MEMORY, stats.getCommittedHeapGB());
+    setMetricValue(AbfsReadResourceUtilizationMetricsEnum.USED_MEMORY, stats.getUsedHeapGB());
+    setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MAX_HEAP_MEMORY, stats.getMaxHeapGB());
+    setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MEMORY_LOAD, stats.getMemoryLoad() * HUNDRED_D);
+    setMetricValue(AbfsReadResourceUtilizationMetricsEnum.LAST_SCALE_DIRECTION,
+        stats.getLastScaleDirectionNumeric(stats.getLastScaleDirection()));
+    setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MAX_CPU_UTILIZATION, stats.getMaxCpuUtilization() * HUNDRED_D);
+    setMetricValue(AbfsReadResourceUtilizationMetricsEnum.JVM_PROCESS_ID, stats.getJvmProcessId());
+
+    markUpdated();
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsWriteResourceUtilizationMetrics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsWriteResourceUtilizationMetrics.java
new file mode 100644
index 0000000..bace442
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsWriteResourceUtilizationMetrics.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
+
+import org.apache.hadoop.fs.azurebfs.enums.AbfsWriteResourceUtilizationMetricsEnum;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+import org.apache.hadoop.fs.azurebfs.WriteThreadPoolSizeManager;
+
+/**
+ * Metrics container for the ABFS write thread pool.
+ * <p>
+ * This class records pool size, CPU utilization, memory usage,
+ * scaling direction, and other runtime indicators reported by
+ * {@link WriteThreadPoolSizeManager.WriteThreadPoolStats}.
+ * </p>
+ */
+public class AbfsWriteResourceUtilizationMetrics
+    extends
+    AbstractAbfsResourceUtilizationMetrics<AbfsWriteResourceUtilizationMetricsEnum> {
+
+  /**
+   * Creates a metrics set for write operations, pre-initializing
+   * all metric keys defined in {@link AbfsWriteResourceUtilizationMetricsEnum}.
+   */
+  public AbfsWriteResourceUtilizationMetrics() {
+    super(AbfsWriteResourceUtilizationMetricsEnum.values(), FSOperationType.WRITE.toString());
+  }
+
+  /**
+   * Updates all write-thread-pool metrics using the latest stats snapshot.
+   * Each field in {@link WriteThreadPoolSizeManager.WriteThreadPoolStats}
+   * is mapped to a corresponding metric.
+   *
+   * @param stats the latest thread-pool statistics; ignored if {@code null}
+   */
+  public synchronized void update(WriteThreadPoolSizeManager.WriteThreadPoolStats stats) {
+    if (stats == null) {
+      return;
+    }
+
+    setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.CURRENT_POOL_SIZE, stats.getCurrentPoolSize());
+    setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MAX_POOL_SIZE, stats.getMaxPoolSize());
+    setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.ACTIVE_THREADS, stats.getActiveThreads());
+    setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.IDLE_THREADS, stats.getIdleThreads());
+    setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.JVM_CPU_UTILIZATION, stats.getJvmCpuLoad() * HUNDRED_D);
+    setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.SYSTEM_CPU_UTILIZATION, stats.getSystemCpuUtilization() * HUNDRED_D);
+    setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.AVAILABLE_MEMORY, stats.getMemoryUtilization());
+    setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.COMMITTED_MEMORY, stats.getCommittedHeapGB());
+    setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.USED_MEMORY, stats.getUsedHeapGB());
+    setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MAX_HEAP_MEMORY, stats.getMaxHeapGB());
+    setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MEMORY_LOAD, stats.getMemoryLoad() * HUNDRED_D);
+    setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.LAST_SCALE_DIRECTION,
+        stats.getLastScaleDirectionNumeric(stats.getLastScaleDirection()));
+    setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MAX_CPU_UTILIZATION, stats.getMaxCpuUtilization() * HUNDRED_D);
+    setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.JVM_PROCESS_ID, stats.getJvmProcessId());
+
+    markUpdated();
+  }
+}
+
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbstractAbfsResourceUtilizationMetrics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbstractAbfsResourceUtilizationMetrics.java
new file mode 100644
index 0000000..2e5a172
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbstractAbfsResourceUtilizationMetrics.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum;
+import org.apache.hadoop.fs.azurebfs.enums.AbfsResourceUtilizationMetricsEnum;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_EQUALS;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.CHAR_DOLLAR;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
+
+/**
+ * Abstract base class for tracking ABFS resource metrics, handling metric registration,
+ * updates, versioning, and compact serialization for diagnostics.
+ *
+ * @param <T> enum type implementing {@link AbfsResourceUtilizationMetricsEnum}
+ */
+public abstract class AbstractAbfsResourceUtilizationMetrics<T extends Enum<T> & AbfsResourceUtilizationMetricsEnum>
+    extends AbstractAbfsStatisticsSource {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbstractAbfsResourceUtilizationMetrics.class);
+
+  /**
+   * Tracks whether any metric has been updated at least once.
+   */
+  private final AtomicBoolean updatedAtLeastOnce = new AtomicBoolean(false);
+
+  /**
+   * A version counter incremented each time a metric update occurs.
+   * Used to detect whether metrics have changed since the last serialization.
+   */
+  private final AtomicLong updateVersion = new AtomicLong(0);
+
+  /**
+   * The last version number that was serialized and pushed out.
+   */
+  private final AtomicLong lastPushedVersion = new AtomicLong(-1);
+
+  /**
+   * The set of metrics supported by this metrics instance.
+   */
+  private final T[] metrics;
+
+  /**
+   * A short identifier describing the operation or subsystem these metrics represent.
+   * This prefix appears in the serialized results string.
+   */
+  private final String operationType;
+
+  /**
+   * Constructs the resource metrics abstraction.
+   * Registers gauges (and later counters) with the Hadoop {@link IOStatisticsStore}
+   * based on the metric enum values.
+   *
+   * @param metrics        all metric enum constants supported by this instance
+   * @param operationType  a short label used as the prefix when serializing metrics
+   */
+  protected AbstractAbfsResourceUtilizationMetrics(T[] metrics, String operationType) {
+    this.metrics = metrics;
+    this.operationType = operationType;
+
+    IOStatisticsStore store = iostatisticsStore()
+        .withGauges(getMetricNames(StatisticTypeEnum.TYPE_GAUGE))
+        .build();
+    setIOStatistics(store);
+  }
+
+  /**
+   * Extracts the names of metrics of the specified type.
+   *
+   * @param type the type of metrics to return (e.g., gauge, counter)
+   * @return an array of metric names of the given type
+   */
+  private String[] getMetricNames(StatisticTypeEnum type) {
+    return Arrays.stream(metrics)
+        .filter(m -> m.getStatisticType().equals(type))
+        .flatMap(m -> Stream.of(m.getName()))
+        .toArray(String[]::new);
+  }
+
+  /**
+   * Sets the value of a metric using its configured statistic type.
+   * <ul>
+   *   <li>For {@code TYPE_GAUGE}, the value overwrites the existing gauge value.</li>
+   *   <li>For {@code TYPE_COUNTER}, the value increments the counter.</li>
+   * </ul>
+   *
+   * @param metric the metric to update
+   * @param value  the numeric value to assign or increment
+   */
+  protected void setMetricValue(T metric, double value) {
+    switch (metric.getStatisticType()) {
+    case TYPE_GAUGE:
+      setGaugeValue(metric.getName(), (long) value);
+      break;
+    case TYPE_COUNTER:
+      setCounterValue(metric.getName(), (long) value);
+      break;
+    default:
+      LOG.warn("Unsupported metric type: {}", metric.getStatisticType());
+    }
+  }
+
+  /**
+   * Marks that a metric update has occurred.
+   * Increments the version so consumers know that new data is available.
+   */
+  protected void markUpdated() {
+    updatedAtLeastOnce.set(true);
+    updateVersion.incrementAndGet();
+  }
+
+  /**
+   * Returns a flag indicating whether any metric has been updated since initialization.
+   *
+   * @return the {@link AtomicBoolean} tracking whether at least one update occurred
+   */
+  public boolean getUpdatedAtLeastOnce() {
+    return updatedAtLeastOnce.get();
+  }
+
+  /**
+   * Serializes the current metrics to a compact string format suitable for logs.
+   * @return a serialized metrics string or an empty string if no updates occurred
+   */
+  @Override
+  public String toString() {
+    if (!updatedAtLeastOnce.get()) {
+      return EMPTY_STRING;
+    }
+
+    long currentVersion = updateVersion.get();
+    if (currentVersion == lastPushedVersion.get()) {
+      return EMPTY_STRING;
+    }
+
+    synchronized (this) {
+      if (currentVersion == lastPushedVersion.get()) {
+        return EMPTY_STRING;
+      }
+
+      StringBuilder sb = new StringBuilder(operationType).append(CHAR_EQUALS);
+
+      for (T metric : metrics) {
+        sb.append(metric.getName())
+            .append(CHAR_EQUALS)
+            .append(lookupGaugeValue(metric.getName()))
+            .append(CHAR_DOLLAR);
+      }
+
+      lastPushedVersion.set(currentVersion);
+      return sb.toString();
+    }
+  }
+}
+
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java
index 8610add..98ef0bc 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java
@@ -125,6 +125,11 @@ protected AbfsRestOperation remoteWrite(AbfsBlock blockToUpload,
     TracingContext tracingContextAppend = new TracingContext(tracingContext);
     tracingContextAppend.setIngressHandler(BLOB_APPEND + " T " + threadIdStr);
     tracingContextAppend.setPosition(String.valueOf(blockToUpload.getOffset()));
+     // Fetches write thread pool metrics from the ABFS client and adds them to the tracing context.
+    AbfsWriteResourceUtilizationMetrics writeResourceUtilizationMetrics = getWriteResourceUtilizationMetrics();
+    if (writeResourceUtilizationMetrics != null) {
+      tracingContextAppend.setResourceUtilizationMetricResults(writeResourceUtilizationMetrics.toString());
+    }
     try {
       LOG.trace("Starting remote write for block with ID {} and offset {}",
           blockToUpload.getBlockId(), blockToUpload.getOffset());
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java
index 96a8f07..ccf5080 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java
@@ -117,6 +117,11 @@ protected AbfsRestOperation remoteWrite(AbfsBlock blockToUpload,
       AppendRequestParameters reqParams,
       TracingContext tracingContext) throws IOException {
     TracingContext tracingContextAppend = new TracingContext(tracingContext);
+    // Fetches write thread pool metrics from the ABFS client and adds them to the tracing context.
+    AbfsWriteResourceUtilizationMetrics writeResourceUtilizationMetrics = getWriteResourceUtilizationMetrics();
+    if (writeResourceUtilizationMetrics != null) {
+      tracingContextAppend.setResourceUtilizationMetricResults(writeResourceUtilizationMetrics.toString());
+    }
     String threadIdStr = String.valueOf(Thread.currentThread().getId());
     if (tracingContextAppend.getIngressHandler().equals(EMPTY_STRING)) {
       tracingContextAppend.setIngressHandler(DFS_APPEND + " T " + threadIdStr);
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java
index cfa0131..4664f1f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java
@@ -110,6 +110,11 @@ protected AbfsRestOperation remoteWrite(AbfsBlock blockToUpload,
       TracingContext tracingContext) throws IOException {
     AbfsRestOperation op;
     TracingContext tracingContextAppend = new TracingContext(tracingContext);
+    // Fetches write thread pool metrics from the ABFS client and adds them to the tracing context.
+    AbfsWriteResourceUtilizationMetrics writeResourceUtilizationMetrics = getWriteResourceUtilizationMetrics();
+    if (writeResourceUtilizationMetrics != null) {
+      tracingContextAppend.setResourceUtilizationMetricResults(writeResourceUtilizationMetrics.toString());
+    }
     String threadIdStr = String.valueOf(Thread.currentThread().getId());
     tracingContextAppend.setIngressHandler(FALLBACK_APPEND + " T " + threadIdStr);
     tracingContextAppend.setPosition(String.valueOf(blockToUpload.getOffset()));
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java
index 81007e1..48f7058 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java
@@ -236,4 +236,15 @@ protected String computeFullBlobMd5() {
     }
     return fullBlobMd5;
   }
+
+  /**
+   * Helper that returns the write thread-pool metrics from the client's counters, if available.
+   *
+   * @return the {@link AbfsWriteResourceUtilizationMetrics} instance or {@code null} when not present
+   */
+  protected AbfsWriteResourceUtilizationMetrics getWriteResourceUtilizationMetrics() {
+    return getAbfsOutputStream().getClient()
+        .getAbfsCounters()
+        .getAbfsWriteResourceUtilizationMetrics();
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
index 7943755..d64c724 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
@@ -20,12 +20,7 @@
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
 
-import com.sun.management.OperatingSystemMXBean;
-
 import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.lang.management.MemoryUsage;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
@@ -43,11 +38,20 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.hadoop.fs.azurebfs.utils.ResourceUtilizationUtils;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
 
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_DOWN;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_ACTION_NEEDED;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_DOWN_AT_MIN;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_UP_AT_MAX;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_UP;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO_D;
 
 /**
  * The Improved Read Buffer Manager for Rest AbfsClient.
@@ -105,14 +109,33 @@ public final class ReadBufferManagerV2 extends ReadBufferManager {
 
   private static AtomicBoolean isConfigured = new AtomicBoolean(false);
 
+  /* Metrics collector for monitoring the performance of the ABFS read thread pool.  */
+  private final AbfsReadResourceUtilizationMetrics readThreadPoolMetrics;
+  /* The ABFSCounters instance for updating read buffer manager related metrics. */
+  private final AbfsCounters abfsCounters;
+  /* Tracks the last scale direction applied, or empty if none. */
+  private volatile String lastScaleDirection = EMPTY_STRING;
+  /* Maximum CPU utilization observed during the monitoring interval. */
+  private volatile double maxJvmCpuUtilization = 0.0;
+
   /**
    * Private constructor to prevent instantiation as this needs to be singleton.
+   *
+   * @param abfsCounters the {@link AbfsCounters} used for managing read operations.
    */
-  private ReadBufferManagerV2() {
+  private ReadBufferManagerV2(AbfsCounters abfsCounters) {
+    this.abfsCounters = abfsCounters;
+    readThreadPoolMetrics = abfsCounters.getAbfsReadResourceUtilizationMetrics();
     printTraceLog("Creating Read Buffer Manager V2 with HADOOP-18546 patch");
   }
 
-  static ReadBufferManagerV2 getBufferManager() {
+  /**
+   * Returns the singleton instance of {@code ReadBufferManagerV2}.
+   *
+   * @param abfsCounters the {@link AbfsCounters} used for read operations.
+   * @return the singleton instance of {@code ReadBufferManagerV2}.
+   */
+  static ReadBufferManagerV2 getBufferManager(AbfsCounters abfsCounters) {
     if (!isConfigured.get()) {
       throw new IllegalStateException("ReadBufferManagerV2 is not configured. "
           + "Please call setReadBufferManagerConfigs() before calling getBufferManager().");
@@ -121,7 +144,7 @@ static ReadBufferManagerV2 getBufferManager() {
       LOCK.lock();
       try {
         if (bufferManager == null) {
-          bufferManager = new ReadBufferManagerV2();
+          bufferManager = new ReadBufferManagerV2(abfsCounters);
           bufferManager.init();
           LOGGER.trace("ReadBufferManagerV2 singleton initialized");
         }
@@ -211,7 +234,7 @@ void init() {
         workerThreadFactory);
     workerPool.allowCoreThreadTimeOut(true);
     for (int i = 0; i < minThreadPoolSize; i++) {
-      ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager());
+      ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager(abfsCounters));
       workerRefs.add(worker);
       workerPool.submit(worker);
     }
@@ -743,7 +766,7 @@ private synchronized boolean tryMemoryUpscale() {
       printTraceLog("Dynamic scaling is disabled, skipping memory upscale");
       return false; // Dynamic scaling is disabled, so no upscaling.
     }
-    double memoryLoad = getMemoryLoad();
+    double memoryLoad = ResourceUtilizationUtils.getMemoryLoad();
     if (memoryLoad < memoryThreshold && getNumBuffers() < maxBufferPoolSize) {
       // Create and Add more buffers in getFreeList().
       int nextIndx = getNumBuffers();
@@ -789,7 +812,7 @@ > getThresholdAgeMilliseconds()) {
       }
     }
 
-    double memoryLoad = getMemoryLoad();
+    double memoryLoad = ResourceUtilizationUtils.getMemoryLoad();
     if (isDynamicScalingEnabled && memoryLoad > memoryThreshold) {
       synchronized (this) {
         if (isFreeListEmpty()) {
@@ -831,7 +854,10 @@ private boolean manualEviction(final ReadBuffer buf) {
    */
   private void adjustThreadPool() {
     int currentPoolSize = workerRefs.size();
-    double cpuLoad = getCpuLoad();
+    double cpuLoad = ResourceUtilizationUtils.getJvmCpuLoad();
+    if (cpuLoad > maxJvmCpuUtilization) {
+      maxJvmCpuUtilization = cpuLoad;
+    }
     int requiredPoolSize = getRequiredThreadPoolSize();
     int newThreadPoolSize;
     printTraceLog(
@@ -843,29 +869,62 @@ private void adjustThreadPool() {
           (int) Math.ceil(
               (currentPoolSize * (HUNDRED_D + threadPoolUpscalePercentage))
                   / HUNDRED_D));
-      // Create new Worker Threads
-      for (int i = currentPoolSize; i < newThreadPoolSize; i++) {
-        ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager());
-        workerRefs.add(worker);
-        workerPool.submit(worker);
+      if (currentPoolSize == maxThreadPoolSize || newThreadPoolSize == currentPoolSize) {
+        lastScaleDirection = SCALE_DIRECTION_NO_UP_AT_MAX;   // Already full, cannot scale up
+      } else {
+        lastScaleDirection = SCALE_DIRECTION_UP;
       }
+      // Create new Worker Threads
+      if (SCALE_DIRECTION_UP.equals(lastScaleDirection)) {
+        for (int i = currentPoolSize; i < newThreadPoolSize; i++) {
+          ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager(abfsCounters));
+          workerRefs.add(worker);
+          workerPool.submit(worker);
+        }
+      }
+      // Capture the latest thread pool statistics (pool size, CPU, memory, etc.)
+      ReadThreadPoolStats stats = getCurrentStats(cpuLoad);
+      // Update the read thread pool metrics with the latest statistics snapshot.
+      readThreadPoolMetrics.update(stats);
       printTraceLog("Increased worker pool size from {} to {}", currentPoolSize,
           newThreadPoolSize);
+    } else if (cpuLoad < cpuThreshold && currentPoolSize > requiredPoolSize) {
+      lastScaleDirection = SCALE_DIRECTION_NO_ACTION_NEEDED;
+      // Capture the latest thread pool statistics (pool size, CPU, memory, etc.)
+      ReadThreadPoolStats stats = getCurrentStats(cpuLoad);
+      // Update the read thread pool metrics with the latest statistics snapshot.
+      readThreadPoolMetrics.update(stats);
     } else if (cpuLoad > cpuThreshold || currentPoolSize > requiredPoolSize) {
       newThreadPoolSize = Math.max(minThreadPoolSize,
           (int) Math.ceil(
               (currentPoolSize * (HUNDRED_D - threadPoolDownscalePercentage))
                   / HUNDRED_D));
-      // Signal the extra workers to stop
-      while (workerRefs.size() > newThreadPoolSize) {
-        ReadBufferWorker worker = workerRefs.remove(workerRefs.size() - 1);
-        worker.stop();
+      if (currentPoolSize == minThreadPoolSize || newThreadPoolSize == currentPoolSize) {
+        lastScaleDirection = SCALE_DIRECTION_NO_DOWN_AT_MIN;  // Already at minimum, cannot scale down
+      } else {
+        lastScaleDirection = SCALE_DIRECTION_DOWN;
       }
+      if (SCALE_DIRECTION_DOWN.equals(lastScaleDirection)) {
+        // Signal the extra workers to stop
+        while (workerRefs.size() > newThreadPoolSize) {
+          ReadBufferWorker worker = workerRefs.remove(workerRefs.size() - 1);
+          worker.stop();
+        }
+      }
+      // Capture the latest thread pool statistics (pool size, CPU, memory, etc.)
+      ReadThreadPoolStats stats = getCurrentStats(cpuLoad);
+      // Update the read thread pool metrics with the latest statistics snapshot.
+      readThreadPoolMetrics.update(stats);
       printTraceLog("Decreased worker pool size from {} to {}", currentPoolSize,
           newThreadPoolSize);
     } else {
+      lastScaleDirection = EMPTY_STRING;
       printTraceLog("No change in worker pool size. CPU load: {} Pool size: {}",
           cpuLoad, currentPoolSize);
+      if (cpuLoad >= maxJvmCpuUtilization) {
+        ReadThreadPoolStats stats = getCurrentStats(cpuLoad);
+        readThreadPoolMetrics.update(stats); // publish snapshot
+      }
     }
   }
 
@@ -995,33 +1054,6 @@ private void printDebugLog(String message, Object... args) {
     LOGGER.debug(message, args);
   }
 
-  /**
-   * Get the current memory load of the JVM.
-   * @return the memory load as a double value between 0.0 and 1.0
-   */
-  @VisibleForTesting
-  double getMemoryLoad() {
-    MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
-    MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
-    return (double) memoryUsage.getUsed() / memoryUsage.getMax();
-  }
-
-  /**
-   * Get the current CPU load of the system.
-   * @return the CPU load as a double value between 0.0 and 1.0
-   */
-  @VisibleForTesting
-  public double getCpuLoad() {
-    OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(
-        OperatingSystemMXBean.class);
-    double cpuLoad = osBean.getSystemCpuLoad();
-    if (cpuLoad < 0) {
-      // If the CPU load is not available, return 0.0
-      return 0.0;
-    }
-    return cpuLoad;
-  }
-
   @VisibleForTesting
   synchronized static ReadBufferManagerV2 getInstance() {
     return bufferManager;
@@ -1057,6 +1089,17 @@ public ScheduledExecutorService getCpuMonitoringThread() {
     return cpuMonitorThread;
   }
 
+  /**
+   * Returns the maximum JVM CPU utilization observed during the current
+   * monitoring interval or since the last reset.
+   *
+   * @return the highest JVM CPU utilization percentage recorded
+   */
+  @VisibleForTesting
+  public double getMaxJvmCpuUtilization() {
+    return maxJvmCpuUtilization;
+  }
+
   public int getRequiredThreadPoolSize() {
     return (int) Math.ceil(THREAD_POOL_REQUIREMENT_BUFFER
         * (getReadAheadQueue().size()
@@ -1097,4 +1140,84 @@ private void incrementActiveBufferCount() {
   private void decrementActiveBufferCount() {
     numberOfActiveBuffers.getAndDecrement();
   }
+
+  /**
+   * Represents current statistics of the read thread pool and system.
+   */
+  public static class ReadThreadPoolStats extends ResourceUtilizationStats {
+
+    /**
+     * Constructs a {@link .ReadThreadPoolStats} instance containing thread pool
+     * metrics and JVM/system resource utilization details.
+     *
+     * @param currentPoolSize the current number of threads in the pool
+     * @param maxPoolSize the maximum number of threads permitted in the pool
+     * @param activeThreads the number of threads actively executing tasks
+     * @param idleThreads the number of idle threads in the pool
+     * @param jvmCpuLoad the current JVM CPU load (0.0–1.0)
+     * @param systemCpuUtilization the current system-wide CPU utilization (0.0–1.0)
+     * @param availableHeapGB the available heap memory in gigabytes
+     * @param committedHeapGB the committed heap memory in gigabytes
+     * @param usedHeapGB the available heap memory in gigabytes
+     * @param maxHeapGB the committed heap memory in gigabytes
+     * @param memoryLoad the JVM memory load (used / max)
+     * @param lastScaleDirection the last scaling action performed: "I" (increase),
+     * "D" (decrease), or empty if no scaling occurred
+     * @param maxCpuUtilization the peak JVM CPU utilization observed during this interval
+     * 9*+@param jvmProcessId the process ID of the JVM
+     */
+    public ReadThreadPoolStats(int currentPoolSize,
+        int maxPoolSize, int activeThreads, int idleThreads,
+        double jvmCpuLoad,
+        double systemCpuUtilization, double availableHeapGB,
+        double committedHeapGB, double usedHeapGB, double maxHeapGB, double memoryLoad,
+        String lastScaleDirection, double maxCpuUtilization, long jvmProcessId) {
+      super(currentPoolSize, maxPoolSize, activeThreads, idleThreads,
+          jvmCpuLoad, systemCpuUtilization, availableHeapGB,
+          committedHeapGB, usedHeapGB, maxHeapGB, memoryLoad, lastScaleDirection,
+          maxCpuUtilization, jvmProcessId);
+    }
+  }
+
+  /**
+   * Creates and returns a snapshot of the current read thread pool and system metrics.
+   * This method captures live values such as pool size, active threads, JVM CPU load,
+   * system CPU utilization, available heap memory, and the maximum CPU utilization
+   * observed during the current interval.
+   *
+   * @param jvmCpuLoad the current JVM process CPU utilization percentage
+   * @return a {@link ReadThreadPoolStats} object containing the current thread pool
+   *         and system resource statistics
+   */
+  synchronized ReadThreadPoolStats getCurrentStats(double jvmCpuLoad) {
+    if (workerPool == null) {
+      return new ReadThreadPoolStats(ZERO, ZERO, ZERO, ZERO, ZERO_D, ZERO_D,
+          ZERO_D, ZERO_D, ZERO_D, ZERO_D, ZERO_D, EMPTY_STRING, ZERO_D, ZERO);
+    }
+
+    ThreadPoolExecutor exec = this.workerPool;
+    String currentScaleDirection = lastScaleDirection;
+    lastScaleDirection = EMPTY_STRING;
+
+    int poolSize = exec.getPoolSize();
+    int activeThreads = exec.getActiveCount();
+    int idleThreads = poolSize - activeThreads;
+
+    return new ReadThreadPoolStats(
+        poolSize,                      // Current thread count
+        exec.getMaximumPoolSize(),     // Max allowed threads
+        activeThreads,                 // Busy threads
+        idleThreads,                   // Idle threads
+        jvmCpuLoad,             // JVM CPU usage (ratio)
+        ResourceUtilizationUtils.getSystemCpuLoad(),     // System CPU usage (ratio)
+        ResourceUtilizationUtils.getAvailableHeapMemory(),      // Free heap (GB)
+        ResourceUtilizationUtils.getCommittedHeapMemory(),      // Committed heap (GB)
+        ResourceUtilizationUtils.getUsedHeapMemory(),   // Used heap (GB)
+        ResourceUtilizationUtils.getMaxHeapMemory(),    // Max heap (GB)
+        ResourceUtilizationUtils.getMemoryLoad(),                    // used/max
+        currentScaleDirection,         // "I", "D", or ""
+        getMaxJvmCpuUtilization(),             // Peak JVM CPU usage so far,
+        ResourceUtilizationUtils.getJvmProcessId()            // JVM process id.
+    );
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ResourceUtilizationStats.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ResourceUtilizationStats.java
new file mode 100644
index 0000000..4fa0712
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ResourceUtilizationStats.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.NO_ACTION_NEEDED;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.NO_SCALE_DOWN_AT_MIN;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.NO_SCALE_UP_AT_MAX;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_DOWN;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_ACTION_NEEDED;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_DOWN_AT_MIN;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_UP_AT_MAX;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_UP;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DOWN;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_NONE;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_UP;
+
+/**
+ * Represents current statistics of the thread pool and system.
+ */
+public abstract class ResourceUtilizationStats {
+
+  private final int currentPoolSize;  // Current number of threads in the pool
+  private final int maxPoolSize;        // Maximum allowed pool size
+  private final int activeThreads;    // Number of threads currently executing tasks
+  private final int idleThreads;        // Number of threads not executing tasks
+  private final double jvmCpuLoad;    // Current JVM CPU utilization (%)
+  private final double systemCpuUtilization; // Current system CPU utilization (%)
+  private final double availableHeapGB;       // Available heap memory (GB)
+  private final double committedHeapGB;  // Total committed heap memory (GB)
+  private final double usedHeapGB;        // Used heap memory (GB)
+  private final double maxHeapGB;         // Max heap memory (GB)
+  private final double memoryLoad;  // Heap usage ratio (used/max)
+  private final String lastScaleDirection;  // Last resize direction: "I" (increase) or "D" (decrease)
+  private final double maxCpuUtilization; // Peak JVM CPU observed in the current interval
+  private final long jvmProcessId;   // JVM Process ID
+
+  /**
+   * Constructs a {@link ResourceUtilizationStats} instance containing thread pool
+   * metrics and JVM/system resource utilization details.
+   *
+   * @param currentPoolSize the current number of threads in the pool
+   * @param maxPoolSize the maximum number of threads permitted in the pool
+   * @param activeThreads the number of threads actively executing tasks
+   * @param idleThreads the number of idle threads in the pool
+   * @param jvmCpuLoad the current JVM CPU load (0.0–1.0)
+   * @param systemCpuUtilization the current system-wide CPU utilization (0.0–1.0)
+   * @param availableHeapGB the available JVM memory in gigabytes
+   * @param committedHeapGB the committed heap memory in gigabytes
+   * @param usedHeapGB the available heap memory in gigabytes
+   * @param maxHeapGB the committed heap memory in gigabytes
+   * @param memoryLoad the JVM memory load (used / max)
+   * @param lastScaleDirection the last scaling action performed: "I" (increase),
+   * "D" (decrease), or empty if no scaling occurred
+   * @param maxCpuUtilization the peak JVM CPU utilization observed during this interval
+   * @param jvmProcessId the process ID of the JVM
+   */
+  public ResourceUtilizationStats(int currentPoolSize,
+      int maxPoolSize, int activeThreads, int idleThreads,
+      double jvmCpuLoad, double systemCpuUtilization, double availableHeapGB,
+      double committedHeapGB, double usedHeapGB, double maxHeapGB, double memoryLoad, String lastScaleDirection,
+      double maxCpuUtilization, long jvmProcessId) {
+    this.currentPoolSize = currentPoolSize;
+    this.maxPoolSize = maxPoolSize;
+    this.activeThreads = activeThreads;
+    this.idleThreads = idleThreads;
+    this.jvmCpuLoad = jvmCpuLoad;
+    this.systemCpuUtilization = systemCpuUtilization;
+    this.availableHeapGB = availableHeapGB;
+    this.committedHeapGB = committedHeapGB;
+    this.usedHeapGB = usedHeapGB;
+    this.maxHeapGB = maxHeapGB;
+    this.memoryLoad = memoryLoad;
+    this.lastScaleDirection = lastScaleDirection;
+    this.maxCpuUtilization = maxCpuUtilization;
+    this.jvmProcessId = jvmProcessId;
+  }
+
+  /** @return the current number of threads in the pool. */
+  public int getCurrentPoolSize() {
+    return currentPoolSize;
+  }
+
+  /** @return the maximum allowed size of the thread pool. */
+  public int getMaxPoolSize() {
+    return maxPoolSize;
+  }
+
+  /** @return the number of threads currently executing tasks. */
+  public int getActiveThreads() {
+    return activeThreads;
+  }
+
+  /** @return the number of threads currently idle. */
+  public int getIdleThreads() {
+    return idleThreads;
+  }
+
+  /** @return the overall system CPU utilization percentage. */
+  public double getSystemCpuUtilization() {
+    return systemCpuUtilization;
+  }
+
+  /** @return the available heap memory in gigabytes. */
+  public double getMemoryUtilization() {
+    return availableHeapGB;
+  }
+
+  /** @return the total committed heap memory in gigabytes */
+  public double getCommittedHeapGB() {
+    return committedHeapGB;
+  }
+
+  /** @return the used heap memory in gigabytes */
+  public double getUsedHeapGB() {
+    return usedHeapGB;
+  }
+
+  /** @return the max heap memory in gigabytes */
+  public double getMaxHeapGB() {
+    return maxHeapGB;
+  }
+
+  /** @return the current JVM memory load (used / committed) as a value between 0.0 and 1.0 */
+  public double getMemoryLoad() {
+    return memoryLoad;
+  }
+
+  /** @return "I" (increase), "D" (decrease), or empty. */
+  public String getLastScaleDirection() {
+    return lastScaleDirection;
+  }
+
+  /** @return the JVM process CPU utilization percentage. */
+  public double getJvmCpuLoad() {
+    return jvmCpuLoad;
+  }
+
+  /** @return the max JVM process CPU utilization percentage. */
+  public double getMaxCpuUtilization() {
+    return maxCpuUtilization;
+  }
+
+  /** @return the JVM process ID. */
+  public long getJvmProcessId() {
+    return jvmProcessId;
+  }
+
+  /**
+   * Converts the scale direction string into numeric value.
+   *
+   * @param lastScaleDirection the scale direction ("I", "D", or empty)
+   *
+   * @return 1 for increase, -1 for decrease, 0 for none
+   */
+  public int getLastScaleDirectionNumeric(String lastScaleDirection) {
+    switch (lastScaleDirection) {
+    case SCALE_DIRECTION_UP:
+      return SCALE_UP;    // Scaled up
+    case SCALE_DIRECTION_DOWN:
+      return SCALE_DOWN;   // Scaled down
+    case SCALE_DIRECTION_NO_DOWN_AT_MIN:
+      return NO_SCALE_DOWN_AT_MIN;   // Attempted down-scale, already at minimum
+    case SCALE_DIRECTION_NO_UP_AT_MAX:
+      return NO_SCALE_UP_AT_MAX;    // Attempted up-scale, already at maximum
+    case SCALE_DIRECTION_NO_ACTION_NEEDED:
+      return NO_ACTION_NEEDED; // No action needed
+    default:
+      return SCALE_NONE;  // No scaling
+    }
+  }
+
+  @Override
+  public String toString() {
+    return String.format(
+        "currentPoolSize=%d, maxPoolSize=%d, activeThreads=%d, idleThreads=%d, "
+            + "jvmCpuLoad=%.2f%%, systemCpuUtilization=%.2f%%, "
+            + "availableHeap=%.2fGB, committedHeap=%.2fGB, memoryLoad=%.2f%%, "
+            + "scaleDirection=%s, maxCpuUtilization=%.2f%%, jvmProcessId=%d",
+        currentPoolSize, maxPoolSize, activeThreads,
+        idleThreads, jvmCpuLoad * HUNDRED_D, systemCpuUtilization * HUNDRED_D,
+        availableHeapGB, committedHeapGB, memoryLoad * HUNDRED_D,
+        lastScaleDirection, maxCpuUtilization * HUNDRED_D, jvmProcessId
+    );
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/ResourceUtilizationUtils.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/ResourceUtilizationUtils.java
new file mode 100644
index 0000000..c151a48
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/ResourceUtilizationUtils.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.utils;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+
+import com.sun.management.OperatingSystemMXBean;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BYTES_PER_GIGABYTE;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO_D;
+
+/**
+ * Utility class for retrieving JVM- and system-level resource utilization
+ * metrics such as CPU load, memory usage, and available heap memory.
+ */
+public final class ResourceUtilizationUtils {
+
+  private ResourceUtilizationUtils() {
+    // Prevent instantiation
+  }
+
+  /**
+   * Calculates the available heap memory in gigabytes.
+   * This method uses {@link Runtime#getRuntime()} to obtain the maximum heap memory
+   * allowed for the JVM and subtracts the currently used memory (total - free)
+   * to determine how much heap memory is still available.
+   * The result is rounded up to the nearest gigabyte.
+   *
+   * @return the available heap memory in gigabytes
+   */
+  public static long getAvailableHeapMemory() {
+    MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
+    MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
+    long availableHeapBytes = memoryUsage.getCommitted() - memoryUsage.getUsed();
+    return (availableHeapBytes + BYTES_PER_GIGABYTE - 1) / BYTES_PER_GIGABYTE;
+  }
+
+  /**
+   * Returns the currently committed JVM heap memory in bytes.
+   * This reflects the amount of heap the JVM has reserved from the OS and may grow as needed.
+   *
+   * @return committed heap memory in bytes
+   */
+  @VisibleForTesting
+  public static double getCommittedHeapMemory() {
+    MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
+    MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
+    return (double) memoryUsage.getCommitted() / BYTES_PER_GIGABYTE;
+  }
+
+  /**
+   * Get the current CPU load of the system.
+   * @return the CPU load as a double value between 0.0 and 1.0
+   */
+  @VisibleForTesting
+  public static double getSystemCpuLoad() {
+    OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(
+        OperatingSystemMXBean.class);
+    double cpuLoad = osBean.getSystemCpuLoad();
+    if (cpuLoad < 0) {
+      // If the CPU load is not available, return 0.0
+      return 0.0;
+    }
+    return cpuLoad;
+  }
+
+
+  /**
+   * Gets the current system CPU utilization.
+   *
+   * @return the CPU utilization as a fraction (0.0 to 1.0), or 0.0 if unavailable.
+   */
+  @VisibleForTesting
+  public static double getJvmCpuLoad() {
+    OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(
+        OperatingSystemMXBean.class);
+    double cpuLoad = osBean.getProcessCpuLoad();
+    if (cpuLoad < ZERO) {
+      return ZERO_D;
+    }
+    return cpuLoad;
+  }
+
+  /**
+   * Get the current memory load of the JVM.
+   * @return the memory load as a double value between 0.0 and 1.0
+   */
+  @VisibleForTesting
+  public static double getMemoryLoad() {
+    MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
+    MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
+    return (double) memoryUsage.getUsed() / memoryUsage.getMax();
+  }
+
+  /**
+   * Calculates the used heap memory in gigabytes.
+   * This method returns the amount of heap memory currently used by the JVM.
+   * The result is rounded up to the nearest gigabyte.
+   *
+   * @return the used heap memory in gigabytes
+   */
+  public static long getUsedHeapMemory() {
+    MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
+    MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
+    long usedHeapBytes = memoryUsage.getUsed();
+    return (usedHeapBytes + BYTES_PER_GIGABYTE - 1) / BYTES_PER_GIGABYTE;
+  }
+
+  /**
+   * Calculates the maximum heap memory allowed for the JVM in gigabytes.
+   * This is the upper bound the JVM may expand its heap to.
+   *
+   * @return the maximum heap memory in gigabytes
+   */
+  public static long getMaxHeapMemory() {
+    MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
+    MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
+    long maxHeapBytes = memoryUsage.getMax();
+    return (maxHeapBytes + BYTES_PER_GIGABYTE - 1) / BYTES_PER_GIGABYTE;
+  }
+
+
+  /**
+   * Returns the process ID (PID) of the currently running JVM.
+   * This method uses {@link ProcessHandle#current()} to obtain the ID of the
+   * Java process.
+   *
+   * @return the PID of the current JVM process
+   */
+  public static long getJvmProcessId() {
+    return ProcessHandle.current().pid();
+  }
+
+  /**
+   * Calculates the available max heap memory in gigabytes.
+   * This method uses {@link Runtime#getRuntime()} to obtain the maximum heap memory
+   * allowed for the JVM and subtracts the currently used memory (total - free)
+   * to determine how much heap memory is still available.
+   * The result is rounded up to the nearest gigabyte.
+   *
+   * @return the available heap memory in gigabytes
+   */
+  public static long getAvailableMaxHeapMemory() {
+    MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
+    MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
+    long availableHeapBytes = memoryUsage.getMax() - memoryUsage.getUsed();
+    return (availableHeapBytes + BYTES_PER_GIGABYTE - 1) / BYTES_PER_GIGABYTE;
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
index 62ac76f..8decba9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
@@ -69,8 +69,8 @@ public class TracingContext {
   private String ingressHandler = EMPTY_STRING;
   private String position = EMPTY_STRING; // position of read/write in remote file
   private String metricResults = EMPTY_STRING;
-  private String metricHeader = EMPTY_STRING;
   private ReadType readType = ReadType.UNKNOWN_READ;
+  private String resourceUtilizationMetricResults = EMPTY_STRING;
 
   /**
    * If {@link #primaryRequestId} is null, this field shall be set equal
@@ -129,6 +129,14 @@ public TracingContext(String clientCorrelationID, String fileSystemID,
     this.metricResults = metricResults;
   }
 
+  public TracingContext(String clientCorrelationID, String fileSystemID,
+      FSOperationType opType, boolean needsPrimaryReqId,
+      TracingHeaderFormat tracingHeaderFormat, Listener listener,
+      String metricResults, String resourceUtilizationMetricResults) {
+    this(clientCorrelationID, fileSystemID, opType, needsPrimaryReqId,
+        tracingHeaderFormat, listener, metricResults);
+    this.resourceUtilizationMetricResults = resourceUtilizationMetricResults;
+  }
 
   public TracingContext(TracingContext originalTracingContext) {
     this.fileSystemID = originalTracingContext.fileSystemID;
@@ -146,7 +154,9 @@ public TracingContext(TracingContext originalTracingContext) {
     }
     this.metricResults = originalTracingContext.metricResults;
     this.readType = originalTracingContext.readType;
+    this.resourceUtilizationMetricResults = originalTracingContext.resourceUtilizationMetricResults;
   }
+
   public static String validateClientCorrelationID(String clientCorrelationID) {
     if ((clientCorrelationID.length() > MAX_CLIENT_CORRELATION_ID_LENGTH)
         || (!clientCorrelationID.matches(CLIENT_CORRELATION_ID_PATTERN))) {
@@ -226,28 +236,22 @@ public void constructHeader(AbfsHttpOperation httpOperation, String previousFail
           + position + COLON
           + operatedBlobCount + COLON
           + getOperationSpecificHeader(opType) + COLON
-          + httpOperation.getTracingContextSuffix();
-
-      metricHeader += !(metricResults.trim().isEmpty()) ? metricResults  : EMPTY_STRING;
+          + httpOperation.getTracingContextSuffix() + COLON
+          + metricResults + COLON + resourceUtilizationMetricResults;
       break;
     case TWO_ID_FORMAT:
       header = TracingHeaderVersion.getCurrentVersion() + COLON
           + clientCorrelationID + COLON + clientRequestId;
-      metricHeader += !(metricResults.trim().isEmpty()) ? metricResults  : EMPTY_STRING;
       break;
     default:
       //case SINGLE_ID_FORMAT
       header = TracingHeaderVersion.getCurrentVersion() + COLON
           + clientRequestId;
-      metricHeader += !(metricResults.trim().isEmpty()) ? metricResults  : EMPTY_STRING;
     }
     if (listener != null) { //for testing
       listener.callTracingHeaderValidator(header, format);
     }
     httpOperation.setRequestProperty(HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID, header);
-    if (!metricHeader.equals(EMPTY_STRING)) {
-      httpOperation.setRequestProperty(HttpHeaderConfigurations.X_MS_FECLIENT_METRICS, metricHeader);
-    }
     /*
     * In case the primaryRequestId is an empty-string and if it is the first try to
     * API call (previousFailure shall be null), maintain the last part of clientRequestId's
@@ -398,4 +402,12 @@ public void setReadType(ReadType readType) {
   public ReadType getReadType() {
     return readType;
   }
+
+  /**
+   * Sets the resource utilization metric results string used for tracing or logging.
+   * @param resourceUtilizationMetricResults the formatted metric data to store.
+   */
+  public void setResourceUtilizationMetricResults(final String resourceUtilizationMetricResults) {
+    this.resourceUtilizationMetricResults = resourceUtilizationMetricResults;
+  }
 }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderVersion.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderVersion.java
index 4e5a2d6..6ce0299 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderVersion.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderVersion.java
@@ -38,7 +38,16 @@ public enum TracingHeaderVersion {
    *         :primaryRequestId:streamId:opType:retryHeader:ingressHandler
    *         :position:operatedBlobCount:operationSpecificHeader:httpOperationHeader
    */
-  V1("v1", 13);
+  V1("v1", 13),
+  /**
+   * Version 2 of the tracing header, which includes a version prefix and has 16 permanent fields.
+   * This version is used for the current tracing header schema.
+   * Schema: version:clientCorrelationId:clientRequestId:fileSystemId
+   *         :primaryRequestId:streamId:opType:retryHeader:ingressHandler
+   *         :position:operatedBlobCount:operationSpecificHeader:httpOperationHeader
+   *         :aggregatedMetrics:resourceUtilizationMetrics
+   */
+  V2("v2", 15);
 
   private final String versionString;
   private final int fieldCount;
@@ -59,7 +68,7 @@ public String toString() {
    * @return the latest version of the tracing header.
    */
   public static TracingHeaderVersion getCurrentVersion() {
-    return V1;
+    return V2;
   }
 
   public int getFieldCount() {
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java
index 3d4c9aa..a332c30 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java
@@ -36,10 +36,19 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
+import org.apache.hadoop.fs.azurebfs.services.AbfsWriteResourceUtilizationMetrics;
+import org.apache.hadoop.fs.azurebfs.utils.ResourceUtilizationUtils;
 
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_WRITE_MAX_CONCURRENT_REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_WRITE_CPU_MONITORING_INTERVAL_MILLIS;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_WRITE_LOW_CPU_THRESHOLD_PERCENT;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -49,6 +58,7 @@ class TestWriteThreadPoolSizeManager extends AbstractAbfsIntegrationTest {
   private AbfsConfiguration mockConfig;
   private static final double HIGH_CPU_UTILIZATION_THRESHOLD = 0.95;
   private static final double LOW_CPU_UTILIZATION_THRESHOLD = 0.05;
+  private static final int LOW_MEMORY_USAGE_THRESHOLD_PERCENT = 100;
   private static final int THREAD_SLEEP_DURATION_MS = 200;
   private static final String TEST_FILE_PATH = "testFilePath";
   private static final String TEST_DIR_PATH = "testDirPath";
@@ -95,19 +105,25 @@ public void setUp() {
     when(mockConfig.getWriteMediumCpuThreshold()).thenReturn(MEDIUM_CPU_THRESHOLD);
     when(mockConfig.getWriteLowCpuThreshold()).thenReturn(LOW_CPU_THRESHOLD);
     when(mockConfig.getWriteCpuMonitoringInterval()).thenReturn(CPU_MONITORING_INTERVAL);
+    when(mockConfig.getWriteLowMemoryUsageThresholdPercent()).thenReturn(LOW_MEMORY_USAGE_THRESHOLD_PERCENT);
   }
 
   /**
-   * Ensures that {@link WriteThreadPoolSizeManager#getInstance(String, AbfsConfiguration)} returns a singleton per key.
+   * Verifies that {@link WriteThreadPoolSizeManager#getInstance(String, AbfsConfiguration, AbfsCounters)}
+   * returns the same singleton instance for the same filesystem name, and a different instance
+   * for a different filesystem name.
    */
   @Test
-  void testGetInstanceReturnsSingleton() {
+  void testGetInstanceReturnsSingleton() throws IOException {
     WriteThreadPoolSizeManager instance1
-        = WriteThreadPoolSizeManager.getInstance("testfs", mockConfig);
+        = WriteThreadPoolSizeManager.getInstance("testfs", mockConfig,
+        getFileSystem().getAbfsClient().getAbfsCounters());
     WriteThreadPoolSizeManager instance2
-        = WriteThreadPoolSizeManager.getInstance("testfs", mockConfig);
+        = WriteThreadPoolSizeManager.getInstance("testfs", mockConfig,
+        getFileSystem().getAbfsClient().getAbfsCounters());
     WriteThreadPoolSizeManager instance3 =
-        WriteThreadPoolSizeManager.getInstance("newFs", mockConfig);
+        WriteThreadPoolSizeManager.getInstance("newFs", mockConfig,
+            getFileSystem().getAbfsClient().getAbfsCounters());
     Assertions.assertThat(instance1)
         .as("Expected the same singleton instance for the same key")
         .isSameAs(instance2);
@@ -117,30 +133,37 @@ void testGetInstanceReturnsSingleton() {
   }
 
   /**
-   /**
    * Tests that high CPU usage results in thread pool downscaling.
    */
   @Test
   void testAdjustThreadPoolSizeBasedOnHighCPU() throws InterruptedException, IOException {
-    // Get the executor service (ThreadPoolExecutor)
-    WriteThreadPoolSizeManager instance
-        = WriteThreadPoolSizeManager.getInstance("testfsHigh",
-        getAbfsStore(getFileSystem()).getAbfsConfiguration());
-    ExecutorService executor = instance.getExecutorService();
-    ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
+    // Initialize filesystem and thread pool manager
+    Configuration conf = getRawConfiguration();
+    conf.setBoolean(FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT, true);
+    FileSystem fileSystem = FileSystem.newInstance(conf);
+    try (AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem) {
+      // Get the executor service (ThreadPoolExecutor)
+      WriteThreadPoolSizeManager instance
+          = WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
+          getAbfsStore(abfs).getAbfsConfiguration(),
+          abfs.getAbfsClient().getAbfsCounters());
+      ExecutorService executor = instance.getExecutorService();
+      ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
 
-    // Simulate high CPU usage (e.g., 95% CPU utilization)
-    int initialMaxSize = threadPoolExecutor.getMaximumPoolSize();
-    instance.adjustThreadPoolSizeBasedOnCPU(HIGH_CPU_UTILIZATION_THRESHOLD);  // High CPU
+      // Simulate high CPU usage (e.g., 95% CPU utilization)
+      int initialMaxSize = threadPoolExecutor.getMaximumPoolSize();
+      instance.adjustThreadPoolSizeBasedOnCPU(
+          HIGH_CPU_UTILIZATION_THRESHOLD);  // High CPU
 
-    // Get the new maximum pool size after adjustment
-    int newMaxSize = threadPoolExecutor.getMaximumPoolSize();
+      // Get the new maximum pool size after adjustment
+      int newMaxSize = threadPoolExecutor.getMaximumPoolSize();
 
-    // Assert that the pool size has decreased or is equal to initial PoolSize based on high CPU usage
-    Assertions.assertThat(newMaxSize)
-        .as("Expected pool size to decrease under high CPU usage")
-        .isLessThanOrEqualTo(initialMaxSize);
-    instance.close();
+      // Assert that the pool size has decreased or is equal to initial PoolSize based on high CPU usage
+      Assertions.assertThat(newMaxSize)
+          .as("Expected pool size to decrease under high CPU usage")
+          .isLessThanOrEqualTo(initialMaxSize);
+      instance.close();
+    }
   }
 
   /**
@@ -149,17 +172,24 @@ void testAdjustThreadPoolSizeBasedOnHighCPU() throws InterruptedException, IOExc
   @Test
   void testAdjustThreadPoolSizeBasedOnLowCPU()
       throws InterruptedException, IOException {
-    WriteThreadPoolSizeManager instance
-        = WriteThreadPoolSizeManager.getInstance("testfsLow",
-        getAbfsStore(getFileSystem()).getAbfsConfiguration());
-    ExecutorService executor = instance.getExecutorService();
-    int initialSize = ((ThreadPoolExecutor) executor).getMaximumPoolSize();
-    instance.adjustThreadPoolSizeBasedOnCPU(LOW_CPU_UTILIZATION_THRESHOLD); // Low CPU
-    int newSize = ((ThreadPoolExecutor) executor).getMaximumPoolSize();
-    Assertions.assertThat(newSize)
-        .as("Expected pool size to increase or stay the same under low CPU usage")
-        .isGreaterThanOrEqualTo(initialSize);
-    instance.close();
+    Configuration conf = getRawConfiguration();
+    conf.setBoolean(FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT, true);
+    FileSystem fileSystem = FileSystem.newInstance(conf);
+    try (AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem) {
+      WriteThreadPoolSizeManager instance
+          = WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
+          getAbfsStore(abfs).getAbfsConfiguration(),
+          abfs.getAbfsClient().getAbfsCounters());
+      ExecutorService executor = instance.getExecutorService();
+      int initialSize = ((ThreadPoolExecutor) executor).getMaximumPoolSize();
+      instance.adjustThreadPoolSizeBasedOnCPU(
+          LOW_CPU_UTILIZATION_THRESHOLD); // Low CPU
+      int newSize = ((ThreadPoolExecutor) executor).getMaximumPoolSize();
+      Assertions.assertThat(newSize)
+          .as("Expected pool size to increase or stay the same under low CPU usage")
+          .isGreaterThanOrEqualTo(initialSize);
+      instance.close();
+    }
   }
 
 
@@ -169,7 +199,8 @@ void testAdjustThreadPoolSizeBasedOnLowCPU()
   @Test
   void testExecutorServiceIsNotNull() throws IOException {
     WriteThreadPoolSizeManager instance
-        = WriteThreadPoolSizeManager.getInstance("testfsExec", mockConfig);
+        = WriteThreadPoolSizeManager.getInstance("testfsExec", mockConfig,
+        getFileSystem().getAbfsClient().getAbfsCounters());
     ExecutorService executor = instance.getExecutorService();
     Assertions.assertThat(executor).as("Executor service should be initialized")
         .isNotNull();
@@ -186,7 +217,8 @@ void testExecutorServiceIsNotNull() throws IOException {
   @Test
   void testCloseCleansUp() throws Exception {
     WriteThreadPoolSizeManager instance
-        = WriteThreadPoolSizeManager.getInstance("testfsClose", mockConfig);
+        = WriteThreadPoolSizeManager.getInstance("testfsClose", mockConfig,
+        getFileSystem().getAbfsClient().getAbfsCounters());
     ExecutorService executor = instance.getExecutorService();
     instance.close();
     Assertions.assertThat(executor.isShutdown() || executor.isTerminated())
@@ -206,7 +238,8 @@ void testStartCPUMonitoringSchedulesTask()
       throws InterruptedException, IOException {
     // Create a new instance of WriteThreadPoolSizeManager using a mock configuration
     WriteThreadPoolSizeManager instance
-        = WriteThreadPoolSizeManager.getInstance("testScheduler", mockConfig);
+        = WriteThreadPoolSizeManager.getInstance("testScheduler", mockConfig,
+        getFileSystem().getAbfsClient().getAbfsCounters());
 
     // Call startCPUMonitoring to schedule the monitoring task
     instance.startCPUMonitoring();
@@ -241,7 +274,8 @@ void testABFSWritesUnderCPUStress() throws Exception {
     // Initialize the filesystem and thread pool manager
     AzureBlobFileSystem fs = getFileSystem();
     WriteThreadPoolSizeManager instance =
-        WriteThreadPoolSizeManager.getInstance(getFileSystemName(), getConfiguration());
+        WriteThreadPoolSizeManager.getInstance(getFileSystemName(),
+            getConfiguration(), getFileSystem().getAbfsClient().getAbfsCounters());
     ThreadPoolExecutor executor =
         (ThreadPoolExecutor) instance.getExecutorService();
 
@@ -323,7 +357,8 @@ void testDynamicResizeNoDeadlocksNoTaskLoss() throws Exception {
     // Initialize filesystem and thread pool manager
     AzureBlobFileSystem fs = getFileSystem();
     WriteThreadPoolSizeManager mgr =
-        WriteThreadPoolSizeManager.getInstance(getFileSystemName(), mockConfig);
+        WriteThreadPoolSizeManager.getInstance(getFileSystemName(), mockConfig,
+            getFileSystem().getAbfsClient().getAbfsCounters());
     ThreadPoolExecutor executor = (ThreadPoolExecutor) mgr.getExecutorService();
 
     // Enable monitoring (may not be required if adjust() is triggered internally)
@@ -498,7 +533,8 @@ void testThreadPoolScalesDownOnHighCpuLoad() throws Exception {
     try (FileSystem fileSystem = FileSystem.newInstance(getRawConfiguration())) {
       AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem;
       WriteThreadPoolSizeManager instance =
-          WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(), getConfiguration());
+          WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
+              getConfiguration(), getFileSystem().getAbfsClient().getAbfsCounters());
       ThreadPoolExecutor executor =
           (ThreadPoolExecutor) instance.getExecutorService();
 
@@ -602,7 +638,7 @@ void testScalesDownOnParallelHighMemoryLoad() throws Exception {
       AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem;
       WriteThreadPoolSizeManager instance =
           WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
-              getConfiguration());
+              getConfiguration(), getFileSystem().getAbfsClient().getAbfsCounters());
       ThreadPoolExecutor executor =
           (ThreadPoolExecutor) instance.getExecutorService();
 
@@ -717,7 +753,7 @@ void testThreadPoolScalesUpAfterIdleBurstLoad() throws Exception {
         getRawConfiguration())) {
       AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem;
       WriteThreadPoolSizeManager instance = WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
-              abfs.getAbfsStore().getAbfsConfiguration());
+          abfs.getAbfsStore().getAbfsConfiguration(), getFileSystem().getAbfsClient().getAbfsCounters());
       ThreadPoolExecutor executor =
           (ThreadPoolExecutor) instance.getExecutorService();
 
@@ -766,5 +802,101 @@ void testThreadPoolScalesUpAfterIdleBurstLoad() throws Exception {
       instance.close();
     }
   }
+
+  /**
+   * Verifies that when the system experiences low CPU usage,
+   * the WriteThreadPoolSizeManager maintains the thread pool size
+   * without scaling down and updates the corresponding
+   * write thread pool metrics accordingly.
+   */
+  @Test
+  void testThreadPoolOnLowCpuLoadAndMetricsUpdate()
+      throws Exception {
+    // Initialize filesystem and thread pool manager
+    Configuration conf = getRawConfiguration();
+    conf.setBoolean(FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT, true);
+    conf.setInt(AZURE_WRITE_MAX_CONCURRENT_REQUESTS, 2);
+    conf.setInt(FS_AZURE_WRITE_LOW_CPU_THRESHOLD_PERCENT, 10);
+    conf.setInt(FS_AZURE_WRITE_CPU_MONITORING_INTERVAL_MILLIS, 1_000);
+    FileSystem fileSystem = FileSystem.newInstance(conf);
+    try (AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem) {
+      WriteThreadPoolSizeManager instance =
+          WriteThreadPoolSizeManager.getInstance("fs1",
+              abfs.getAbfsStore().getAbfsConfiguration(),
+              abfs.getAbfsClient().getAbfsCounters());
+      instance.startCPUMonitoring();
+
+      // --- Capture initial metrics and stats ---
+      AbfsWriteResourceUtilizationMetrics metrics =
+          abfs.getAbfsClient()
+              .getAbfsCounters()
+              .getAbfsWriteResourceUtilizationMetrics();
+
+      WriteThreadPoolSizeManager.WriteThreadPoolStats statsBefore =
+          instance.getCurrentStats(ResourceUtilizationUtils.getJvmCpuLoad(), ResourceUtilizationUtils.getMemoryLoad());
+
+      ThreadPoolExecutor executor =
+          (ThreadPoolExecutor) instance.getExecutorService();
+
+      // No CPU hogs this time — simulate light CPU load
+      // Submit lightweight ABFS tasks that barely use CPU
+      int taskCount = 10;
+      CountDownLatch latch = new CountDownLatch(taskCount);
+
+      for (int i = 0; i < taskCount; i++) {
+        executor.submit(() -> {
+          try {
+            // Light operations — minimal CPU load
+            for (int j = 0; j < 3; j++) {
+              Thread.sleep(HUNDRED); // simulate idle/light wait
+            }
+          } catch (Exception e) {
+            Assertions.fail("Light task failed unexpectedly", e);
+          } finally {
+            latch.countDown();
+          }
+        });
+      }
+
+      // Wait for all tasks to finish
+      boolean finished = latch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+      Assertions.assertThat(finished)
+          .as("All lightweight tasks should complete normally")
+          .isTrue();
+
+      // Allow some time for monitoring and metrics update
+      Thread.sleep(SLEEP_DURATION_30S_MS);
+
+      WriteThreadPoolSizeManager.WriteThreadPoolStats statsAfter =
+          instance.getCurrentStats(ResourceUtilizationUtils.getJvmCpuLoad(), ResourceUtilizationUtils.getMemoryLoad());
+
+      //--- Validate that metrics and stats changed ---
+      Assertions.assertThat(statsAfter)
+          .as("Thread pool stats should update after CPU load")
+          .isNotEqualTo(statsBefore);
+
+      String metricsOutput = metrics.toString();
+
+      if (!metricsOutput.isEmpty()) {
+        // Assertions for metrics correctness
+        Assertions.assertThat(metricsOutput)
+            .as("Metrics output should not be empty")
+            .isNotEmpty();
+
+        Assertions.assertThat(metricsOutput)
+            .as("Metrics must include CPU utilization data")
+            .contains("SC=");
+
+        Assertions.assertThat(metricsOutput)
+            .as("Metrics must include memory utilization data")
+            .contains("AM=");
+
+        Assertions.assertThat(metricsOutput)
+            .as("Metrics must include current thread pool size")
+            .contains("CP=");
+      }
+      instance.close();
+    }
+  }
 }
 
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
index a5fbe30..4bf1f56 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
@@ -124,6 +124,8 @@ public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {
   public static final int REDUCED_BACKOFF_INTERVAL = 100;
   public static final int BUFFER_LENGTH = 5;
   public static final int BUFFER_OFFSET = 0;
+  private static final String RANDOM_URI = "abcd";
+  private static final String RANDOM_FILESYSTEM_ID = "abcde";
 
   private final Pattern userAgentStringPattern;
 
@@ -174,7 +176,7 @@ public ITestAbfsClient(HttpOperationType pHttpOperationType) throws Exception {
 
   private String getUserAgentString(AbfsConfiguration config,
       boolean includeSSLProvider) throws IOException, URISyntaxException {
-    AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
+    AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI(RANDOM_URI)));
     AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().withAbfsCounters(abfsCounters).build();
     AbfsClient client;
     if (AbfsServiceType.DFS.equals(config.getFsConfiguredServiceType())) {
@@ -411,7 +413,7 @@ public static AbfsClient createTestClientFromCurrentContext(
     AbfsPerfTracker tracker = new AbfsPerfTracker("test",
         abfsConfig.getAccountName(),
         abfsConfig);
-    AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
+    AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI(RANDOM_URI)));
 
     AbfsClientContext abfsClientContext =
         new AbfsClientContextBuilder().withAbfsPerfTracker(tracker)
@@ -468,7 +470,7 @@ public static AbfsClient createBlobClientFromCurrentContext(
     AbfsPerfTracker tracker = new AbfsPerfTracker("test",
         abfsConfig.getAccountName(),
         abfsConfig);
-    AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
+    AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI(RANDOM_URI)));
 
     AbfsClientContext abfsClientContext =
         new AbfsClientContextBuilder().withAbfsPerfTracker(tracker)
@@ -500,7 +502,7 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
       AbfsConfiguration abfsConfig) throws Exception {
     AuthType currentAuthType = abfsConfig.getAuthType(
         abfsConfig.getAccountName());
-    AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
+    AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI(RANDOM_URI)));
 
     assumeThat(currentAuthType)
         .as("Auth type must be SharedKey or OAuth for this test")
@@ -750,8 +752,8 @@ public void testExpectHundredContinue() throws Exception {
             Mockito.nullable(int.class), Mockito.nullable(int.class),
             Mockito.any());
 
-    TracingContext tracingContext = Mockito.spy(new TracingContext("abcd",
-        "abcde", FSOperationType.APPEND,
+    TracingContext tracingContext = Mockito.spy(new TracingContext(RANDOM_URI,
+        RANDOM_FILESYSTEM_ID, FSOperationType.APPEND,
         TracingHeaderFormat.ALL_ID_FORMAT, null));
 
     // Check that expect header is enabled before the append call.
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java
index a0248ee..db5f596 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java
@@ -149,7 +149,7 @@ public void testApacheClientFallbackDuringConnectionWarmup()
               .build(),
           new AbfsHttpClientConnectionFactory(), keepAliveCache,
           new AbfsConfiguration(new Configuration(), EMPTY_STRING),
-          new URL("https://test.com"), true);
+          new URL("https://abcd.com"), true);
 
       Assertions.assertThat(AbfsApacheHttpClient.usable())
           .describedAs("Apache HttpClient should be not usable")
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
index 84b0fbd..e663db9 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
@@ -172,7 +172,7 @@ private ReadBufferManager getBufferManager(AzureBlobFileSystem fs) {
     if (getConfiguration().isReadAheadV2Enabled()) {
       ReadBufferManagerV2.setReadBufferManagerConfigs(blockSize,
           getConfiguration());
-      return ReadBufferManagerV2.getBufferManager();
+      return ReadBufferManagerV2.getBufferManager(fs.getAbfsStore().getClient().getAbfsCounters());
     }
     ReadBufferManagerV1.setReadBufferManagerConfigs(blockSize);
     return ReadBufferManagerV1.getBufferManager();
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
index 4a902a8..93df652 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
@@ -194,7 +194,7 @@ public AbfsInputStream getAbfsInputStream(AbfsClient abfsClient,
     return inputStream;
   }
 
-  void queueReadAheads(AbfsInputStream inputStream) {
+  void queueReadAheads(AbfsInputStream inputStream) throws IOException {
     // Mimic AbfsInputStream readAhead queue requests
     getBufferManager()
         .queueReadAhead(inputStream, 0, ONE_KB, inputStream.getTracingContext());
@@ -1215,7 +1215,8 @@ private AzureBlobFileSystem createTestFile(Path testFilePath, long testFileSize,
     return fs;
   }
 
-  private void resetReadBufferManager(int bufferSize, int threshold) {
+  private void resetReadBufferManager(int bufferSize, int threshold)
+      throws IOException {
     getBufferManager()
         .testResetReadBufferManager(bufferSize, threshold);
     // Trigger GC as aggressive recreation of ReadBufferManager buffers
@@ -1223,11 +1224,11 @@ private void resetReadBufferManager(int bufferSize, int threshold) {
     System.gc();
   }
 
-  private ReadBufferManager getBufferManager() {
+  private ReadBufferManager getBufferManager() throws IOException {
     if (getConfiguration().isReadAheadV2Enabled()) {
       ReadBufferManagerV2.setReadBufferManagerConfigs(
           getConfiguration().getReadAheadBlockSize(), getConfiguration());
-      return ReadBufferManagerV2.getBufferManager();
+      return ReadBufferManagerV2.getBufferManager(getFileSystem().getAbfsStore().getClient().getAbfsCounters());
     }
     return ReadBufferManagerV1.getBufferManager();
   }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
index 146eed8..2129f5e 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.azurebfs.services;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.Arrays;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
@@ -31,6 +32,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl;
 import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
@@ -111,7 +113,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
    */
   @Test
   public void verifyShortWriteRequest() throws Exception {
-
+    AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
     AbfsClientHandler clientHandler = mock(AbfsClientHandler.class);
     AbfsDfsClient client = mock(AbfsDfsClient.class);
     AbfsRestOperation op = mock(AbfsRestOperation.class);
@@ -122,6 +124,8 @@ public void verifyShortWriteRequest() throws Exception {
     AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
     when(client.getAbfsConfiguration()).thenReturn(abfsConf);
+    when(client.getAbfsCounters()).thenReturn(abfsCounters);
+    when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new AbfsWriteResourceUtilizationMetrics());
     when(client.append(anyString(), any(byte[].class),
         any(AppendRequestParameters.class), any(),
         any(), any(TracingContext.class)))
@@ -178,7 +182,7 @@ public void verifyShortWriteRequest() throws Exception {
    */
   @Test
   public void verifyWriteRequest() throws Exception {
-
+    AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
     AbfsClientHandler clientHandler = mock(AbfsClientHandler.class);
     AbfsDfsClient client = mock(AbfsDfsClient.class);
     AbfsRestOperation op = mock(AbfsRestOperation.class);
@@ -192,7 +196,8 @@ public void verifyWriteRequest() throws Exception {
     TracingContext tracingContext = new TracingContext("test-corr-id",
         "test-fs-id", FSOperationType.WRITE,
         TracingHeaderFormat.ALL_ID_FORMAT, null);
-
+    when(client.getAbfsCounters()).thenReturn(abfsCounters);
+    when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new AbfsWriteResourceUtilizationMetrics());
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
     when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), any(), any(TracingContext.class))).thenReturn(op);
     when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull(), any(), any(TracingContext.class), anyString())).thenReturn(op);
@@ -256,7 +261,7 @@ public void verifyWriteRequest() throws Exception {
    */
   @Test
   public void verifyWriteRequestOfBufferSizeAndClose() throws Exception {
-
+    AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
     AbfsClientHandler clientHandler = mock(AbfsClientHandler.class);
     AbfsDfsClient client = mock(AbfsDfsClient.class);
     AbfsRestOperation op = mock(AbfsRestOperation.class);
@@ -278,6 +283,8 @@ public void verifyWriteRequestOfBufferSizeAndClose() throws Exception {
     when(op.getResult()).thenReturn(httpOp);
     when(clientHandler.getClient(any())).thenReturn(client);
     when(clientHandler.getDfsClient()).thenReturn(client);
+    when(client.getAbfsCounters()).thenReturn(abfsCounters);
+    when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new AbfsWriteResourceUtilizationMetrics());
 
 
     AbfsOutputStream out = Mockito.spy(Mockito.spy(new AbfsOutputStream(
@@ -337,7 +344,7 @@ public void verifyWriteRequestOfBufferSizeAndClose() throws Exception {
    */
   @Test
   public void verifyWriteRequestOfBufferSize() throws Exception {
-
+    AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
     AbfsClientHandler clientHandler = mock(AbfsClientHandler.class);
     AbfsDfsClient client = mock(AbfsDfsClient.class);
     AbfsRestOperation op = mock(AbfsRestOperation.class);
@@ -359,6 +366,8 @@ public void verifyWriteRequestOfBufferSize() throws Exception {
     when(op.getResult()).thenReturn(httpOp);
     when(clientHandler.getClient(any())).thenReturn(client);
     when(clientHandler.getDfsClient()).thenReturn(client);
+    when(client.getAbfsCounters()).thenReturn(abfsCounters);
+    when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new AbfsWriteResourceUtilizationMetrics());
 
     AbfsOutputStream out = Mockito.spy(new AbfsOutputStream(
         populateAbfsOutputStreamContext(
@@ -402,7 +411,7 @@ public void verifyWriteRequestOfBufferSize() throws Exception {
    */
   @Test
   public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception {
-
+    AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
     AbfsClientHandler clientHandler = mock(AbfsClientHandler.class);
     AbfsDfsClient client = mock(AbfsDfsClient.class);
     AbfsRestOperation op = mock(AbfsRestOperation.class);
@@ -421,6 +430,8 @@ public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception {
         isNull(), any(), any(TracingContext.class), anyString())).thenReturn(op);
     when(clientHandler.getClient(any())).thenReturn(client);
     when(clientHandler.getDfsClient()).thenReturn(client);
+    when(client.getAbfsCounters()).thenReturn(abfsCounters);
+    when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new AbfsWriteResourceUtilizationMetrics());
     AbfsOutputStream out = Mockito.spy(new AbfsOutputStream(
         populateAbfsOutputStreamContext(
             BUFFER_SIZE,
@@ -466,7 +477,7 @@ public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception {
    */
   @Test
   public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception {
-
+    AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
     AbfsClientHandler clientHandler = mock(AbfsClientHandler.class);
     AbfsDfsClient client = mock(AbfsDfsClient.class);
     AbfsRestOperation op = mock(AbfsRestOperation.class);
@@ -480,7 +491,8 @@ public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception {
     TracingContext tracingContext = new TracingContext(
         abfsConf.getClientCorrelationId(), "test-fs-id",
         FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(), null);
-
+    when(client.getAbfsCounters()).thenReturn(abfsCounters);
+    when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new AbfsWriteResourceUtilizationMetrics());
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
     when(client.append(anyString(), any(byte[].class),
         any(AppendRequestParameters.class), any(), any(), any(TracingContext.class)))
@@ -547,7 +559,7 @@ public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception {
    */
   @Test
   public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception {
-
+    AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
     AbfsClientHandler clientHandler = mock(AbfsClientHandler.class);
     AbfsDfsClient client = mock(AbfsDfsClient.class);
     AbfsRestOperation op = mock(AbfsRestOperation.class);
@@ -556,6 +568,8 @@ public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception {
     conf.set(accountKey1, accountValue1);
     abfsConf = new AbfsConfiguration(conf, accountName1);
     when(client.getAbfsConfiguration()).thenReturn(abfsConf);
+    when(client.getAbfsCounters()).thenReturn(abfsCounters);
+    when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new AbfsWriteResourceUtilizationMetrics());
     AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
     when(client.getAbfsPerfTracker()).thenReturn(tracker);
     when(client.append(anyString(), any(byte[].class),
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
index 77fbb76..77ae7ff 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
@@ -20,6 +20,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import org.apache.hadoop.conf.Configuration;
@@ -28,6 +29,7 @@
 import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+import org.apache.hadoop.fs.azurebfs.utils.ResourceUtilizationUtils;
 
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING;
@@ -61,17 +63,18 @@ public TestReadBufferManagerV2() throws Exception {
    */
   @Test
   public void testReadBufferManagerV2Init() throws Exception {
+    AbfsClient abfsClient = getFileSystem().getAbfsStore().getClient();
     ReadBufferManagerV2.setReadBufferManagerConfigs(getConfiguration().getReadAheadBlockSize(), getConfiguration());
-    ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+    ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters()).testResetReadBufferManager();
     assertThat(ReadBufferManagerV2.getInstance())
         .as("ReadBufferManager should be uninitialized").isNull();
     intercept(IllegalStateException.class, "ReadBufferManagerV2 is not configured.", () -> {
-      ReadBufferManagerV2.getBufferManager();
+      ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters());
     });
     // verify that multiple invocations of getBufferManager returns same instance.
     ReadBufferManagerV2.setReadBufferManagerConfigs(getConfiguration().getReadAheadBlockSize(), getConfiguration());
-    ReadBufferManagerV2 bufferManager = ReadBufferManagerV2.getBufferManager();
-    ReadBufferManagerV2 bufferManager2 = ReadBufferManagerV2.getBufferManager();
+    ReadBufferManagerV2 bufferManager = ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters());
+    ReadBufferManagerV2 bufferManager2 = ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters());
     ReadBufferManagerV2 bufferManager3 = ReadBufferManagerV2.getInstance();
     assertThat(bufferManager).isNotNull();
     assertThat(bufferManager2).isNotNull();
@@ -94,11 +97,12 @@ public void testDynamicScalingSwitchingOnAndOff() throws Exception {
     conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2, true);
     conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, true);
     try(AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(getFileSystem().getUri(), conf)) {
+      AbfsClient abfsClient = fs.getAbfsStore().getClient();
       AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();
       ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(), abfsConfiguration);
-      ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+      ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters()).testResetReadBufferManager();
       ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(), abfsConfiguration);
-      ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
+      ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters());
       assertThat(bufferManagerV2.getCpuMonitoringThread())
           .as("CPU Monitor thread should be initialized").isNotNull();
       bufferManagerV2.resetBufferManager();
@@ -106,11 +110,12 @@ public void testDynamicScalingSwitchingOnAndOff() throws Exception {
 
     conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, false);
     try(AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(getFileSystem().getUri(), conf)) {
+      AbfsClient abfsClient = fs.getAbfsStore().getClient();
       AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();
       ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(), abfsConfiguration);
-      ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+      ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters()).testResetReadBufferManager();
       ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(), abfsConfiguration);
-      ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
+      ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters());
       assertThat(bufferManagerV2.getCpuMonitoringThread())
           .as("CPU Monitor thread should not be initialized").isNull();
       bufferManagerV2.resetBufferManager();
@@ -127,9 +132,9 @@ public void testThreadPoolDynamicScaling() throws Exception {
     AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
         getAccountName());
     ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
-    ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+    ReadBufferManagerV2.getBufferManager(client.getAbfsCounters()).testResetReadBufferManager();
     ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
-    ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
+    ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager(client.getAbfsCounters());
     assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isEqualTo(2);
     int[] reqOffset = {0};
     int reqLength = 1;
@@ -142,11 +147,11 @@ public void testThreadPoolDynamicScaling() throws Exception {
     });
     t.start();
     Thread.sleep(2L * bufferManagerV2.getCpuMonitoringIntervalInMilliSec());
-    assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isGreaterThan(2);
+    assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isGreaterThanOrEqualTo(2);
     running = false;
     t.join();
     Thread.sleep(4L * bufferManagerV2.getCpuMonitoringIntervalInMilliSec());
-    assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isLessThan(4);
+    assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isLessThanOrEqualTo(4);
   }
 
   @Test
@@ -159,9 +164,9 @@ public void testCpuUpscaleNotAllowedIfCpuAboveThreshold() throws Exception {
     AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
         getAccountName());
     ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
-    ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+    ReadBufferManagerV2.getBufferManager(client.getAbfsCounters()).testResetReadBufferManager();
     ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
-    ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
+    ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager(client.getAbfsCounters());
     assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isEqualTo(2);
     int[] reqOffset = {0};
     int reqLength = 1;
@@ -188,9 +193,9 @@ public void testScheduledEviction() throws Exception {
     Configuration configuration = getReadAheadV2Configuration();
     AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
         getAccountName());
-    ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+    ReadBufferManagerV2.getBufferManager(client.getAbfsCounters()).testResetReadBufferManager();
     ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
-    ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
+    ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager(client.getAbfsCounters());
     // Add a failed buffer to completed queue and set to no free buffers to read ahead.
     ReadBuffer buff = new ReadBuffer();
     buff.setStatus(ReadBufferStatus.READ_FAILED);
@@ -212,9 +217,9 @@ public void testMemoryUpscaleNotAllowedIfMemoryAboveThreshold() throws Exception
     AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
         getAccountName());
     ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
-    ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+    ReadBufferManagerV2.getBufferManager(client.getAbfsCounters()).testResetReadBufferManager();
     ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
-    ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
+    ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager(client.getAbfsCounters());
     // Add a failed buffer to completed queue and set to no free buffers to read ahead.
     ReadBuffer buff = new ReadBuffer();
     buff.setStatus(ReadBufferStatus.READ_FAILED);
@@ -236,9 +241,9 @@ public void testMemoryUpscaleIfMemoryBelowThreshold() throws Exception {
     AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
         getAccountName());
     ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
-    ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+    ReadBufferManagerV2.getBufferManager(client.getAbfsCounters()).testResetReadBufferManager();
     ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
-    ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
+    ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager(client.getAbfsCounters());
     // Add a failed buffer to completed queue and set to no free buffers to read ahead.
     ReadBuffer buff = new ReadBuffer();
     buff.setStatus(ReadBufferStatus.READ_FAILED);
@@ -256,10 +261,11 @@ public void testMemoryDownscaleIfMemoryAboveThreshold() throws Exception {
     configuration.set(FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT, "2");
     AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
         getAccountName());
+    AbfsClient abfsClient = getFileSystem().getAbfsStore().getClient();
     ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
-    ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+    ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters()).testResetReadBufferManager();
     ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
-    ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
+    ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters());
     int initialBuffers = bufferManagerV2.getMinBufferPoolSize();
     assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(initialBuffers);
     running = true;
@@ -283,6 +289,87 @@ public void testMemoryDownscaleIfMemoryAboveThreshold() throws Exception {
     t.join();
   }
 
+  @Test
+  public void testReadMetricUpdation() throws Exception {
+    Configuration configuration = getReadAheadV2Configuration();
+    configuration.set(FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT, "2");
+    FileSystem fileSystem = FileSystem.newInstance(configuration);
+    try (AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem) {
+      AbfsClient abfsClient = abfs.getAbfsStore().getClient();
+      AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
+          getAccountName());
+      ReadBufferManagerV2.setReadBufferManagerConfigs(
+          abfsConfig.getReadAheadBlockSize(), abfsConfig);
+      ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters()).testResetReadBufferManager();
+      ReadBufferManagerV2.setReadBufferManagerConfigs(
+          abfsConfig.getReadAheadBlockSize(), abfsConfig);
+      ReadBufferManagerV2 bufferManagerV2
+          = ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters());
+
+      // --- Capture initial metrics and stats ---
+      AbfsReadResourceUtilizationMetrics metrics =
+          abfsClient.getAbfsCounters().getAbfsReadResourceUtilizationMetrics();
+
+      ReadBufferManagerV2.ReadThreadPoolStats statsBefore =
+          bufferManagerV2.getCurrentStats(ResourceUtilizationUtils.getJvmCpuLoad());
+      int initialBuffers = bufferManagerV2.getMinBufferPoolSize();
+      assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(initialBuffers);
+      running = true;
+      Thread t = new Thread(() -> {
+        while (running) {
+          long maxMemory = Runtime.getRuntime().maxMemory();
+          long usedMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
+          double usage = (double) usedMemory / maxMemory;
+
+          if (usage < HIGH_MEMORY_USAGE_THRESHOLD_PERCENT) {
+            // Allocate more memory
+            allocations.add(new byte[10 * 1024 * 1024]); // 10MB
+          }
+        }
+      }, "MemoryLoadThread");
+      t.setDaemon(true);
+      t.start();
+      Thread.sleep(2L * bufferManagerV2.getMemoryMonitoringIntervalInMilliSec());
+      assertThat(bufferManagerV2.getNumBuffers()).isLessThan(initialBuffers);
+      running = false;
+      t.join();
+
+      ReadBufferManagerV2.ReadThreadPoolStats statsAfter
+          = bufferManagerV2.getCurrentStats(ResourceUtilizationUtils.getJvmCpuLoad());
+
+      // --- Validate that metrics and stats changed ---
+      Assertions.assertThat(statsAfter)
+          .as("Thread pool stats should update after CPU load")
+          .isNotEqualTo(statsBefore);
+
+      boolean updatedMetrics = metrics.getUpdatedAtLeastOnce();
+
+      Assertions.assertThat(updatedMetrics)
+          .as("Metrics should be updated at least once after CPU load")
+          .isTrue();
+
+      String metricsOutput = metrics.toString();
+
+      // Assertions for metrics correctness
+      Assertions.assertThat(metricsOutput)
+          .as("Metrics output should not be empty")
+          .isNotEmpty();
+
+      Assertions.assertThat(metricsOutput)
+          .as("Metrics must include CPU utilization data")
+          .contains("SC=");
+
+      Assertions.assertThat(metricsOutput)
+          .as("Metrics must include memory utilization data")
+          .contains("AM=");
+
+      Assertions.assertThat(metricsOutput)
+          .as("Metrics must include current thread pool size")
+          .contains("CP=");
+    }
+  }
+
+
   private Configuration getReadAheadV2Configuration() {
     Configuration conf = new Configuration(getRawConfiguration());
     conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2, true);