HADOOP-19737: ABFS: Add metrics to identify improvements with read and write aggressiveness (#8056)
Contributed by Anmol Asrani
diff --git a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
index f18a2a6..33573b8 100644
--- a/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
+++ b/hadoop-tools/hadoop-azure/src/config/checkstyle-suppressions.xml
@@ -63,6 +63,12 @@
<!-- allow tests to use _ for ordering. -->
<suppress checks="MethodName"
files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]commit[\\/]ITestAbfsTerasort.java"/>
- <suppress checks="ParameterNumber"
- files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]TestAbfsOutputStream.java"/>
+ <suppress checks="ParameterNumber"
+ files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]TestAbfsOutputStream.java"/>
+ <suppress checks="ParameterNumber"
+ files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]ReadBufferManagerV2.java"/>
+ <suppress checks="ParameterNumber"
+ files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]WriteThreadPoolSizeManager.java"/>
+ <suppress checks="ParameterNumber"
+ files="org[\\/]apache[\\/]hadoop[\\/]fs[\\/]azurebfs[\\/]services[\\/]ResourceUtilizationStats.java"/>
</suppressions>
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index e4c88b2..e759129 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -545,7 +545,6 @@ public class AbfsConfiguration{
private int writeMediumCpuThreshold;
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_WRITE_LOW_CPU_THRESHOLD_PERCENT,
- MinValue = MIN_WRITE_LOW_CPU_THRESHOLD_PERCENT,
MaxValue = MAX_WRITE_LOW_CPU_THRESHOLD_PERCENT,
DefaultValue = DEFAULT_WRITE_LOW_CPU_THRESHOLD_PERCENT)
private int writeLowCpuThreshold;
@@ -566,6 +565,16 @@ public class AbfsConfiguration{
private int highTierMemoryMultiplier;
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+ FS_AZURE_WRITE_HIGH_MEMORY_USAGE_THRESHOLD_PERCENT,
+ DefaultValue = DEFAULT_WRITE_HIGH_MEMORY_USAGE_THRESHOLD_PERCENT)
+ private int writeHighMemoryUsageThresholdPercent;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+ FS_AZURE_WRITE_LOW_MEMORY_USAGE_THRESHOLD_PERCENT,
+ DefaultValue = DEFAULT_WRITE_LOW_MEMORY_USAGE_THRESHOLD_PERCENT)
+ private int writeLowMemoryUsageThresholdPercent;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE, DefaultValue = DEFAULT_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE,
MinValue = MIN_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE, MaxValue = MAX_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE)
private int apacheMaxCacheSize;
@@ -1917,6 +1926,14 @@ public int getHighTierMemoryMultiplier() {
return highTierMemoryMultiplier;
}
+ public int getWriteHighMemoryUsageThresholdPercent() {
+ return writeHighMemoryUsageThresholdPercent;
+ }
+
+ public int getWriteLowMemoryUsageThresholdPercent() {
+ return writeLowMemoryUsageThresholdPercent;
+ }
+
public int getMaxWriteRequestsToQueue() {
if (this.maxWriteRequestsToQueue < 1) {
return 2 * getWriteConcurrentRequestCount();
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
index 7a94117..8bc7d50 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java
@@ -27,6 +27,8 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsBackoffMetrics;
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
import org.apache.hadoop.fs.azurebfs.services.AbfsReadFooterMetrics;
+import org.apache.hadoop.fs.azurebfs.services.AbfsReadResourceUtilizationMetrics;
+import org.apache.hadoop.fs.azurebfs.services.AbfsWriteResourceUtilizationMetrics;
import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics;
@@ -106,6 +108,10 @@ public class AbfsCountersImpl implements AbfsCounters {
private AbfsReadFooterMetrics abfsReadFooterMetrics = null;
+ private AbfsWriteResourceUtilizationMetrics abfsWriteResourceUtilizationMetrics = null;
+
+ private AbfsReadResourceUtilizationMetrics abfsReadResourceUtilizationMetrics = null;
+
private AtomicLong lastExecutionTime = null;
private static final AbfsStatistic[] STATISTIC_LIST = {
@@ -169,6 +175,31 @@ public AbfsCountersImpl(URI uri) {
lastExecutionTime = new AtomicLong(now());
}
+ /**
+ * Initializes the metrics collector for the read thread pool.
+ * <p>
+ * This method creates a new instance of {@link AbfsReadResourceUtilizationMetrics}
+ * to track performance statistics and operational metrics related to
+ * read operations executed by the thread pool.
+ * </p>
+ */
+ public void initializeReadResourceUtilizationMetrics() {
+ abfsReadResourceUtilizationMetrics = new AbfsReadResourceUtilizationMetrics();
+ }
+
+ /**
+ * Initializes the metrics collector for the write thread pool.
+ * <p>
+ * This method creates a new instance of {@link AbfsWriteResourceUtilizationMetrics}
+ * to track performance statistics and operational metrics related to
+ * write operations executed by the thread pool.
+ * </p>
+ */
+ public void initializeWriteResourceUtilizationMetrics() {
+ abfsWriteResourceUtilizationMetrics = new AbfsWriteResourceUtilizationMetrics();
+ }
+
+
@Override
public void initializeMetrics(MetricFormat metricFormat) {
switch (metricFormat) {
@@ -268,6 +299,22 @@ public AbfsReadFooterMetrics getAbfsReadFooterMetrics() {
}
/**
+ * Returns the write thread pool metrics instance, or {@code null} if uninitialized.
+ */
+ @Override
+ public AbfsWriteResourceUtilizationMetrics getAbfsWriteResourceUtilizationMetrics() {
+ return abfsWriteResourceUtilizationMetrics != null ? abfsWriteResourceUtilizationMetrics : null;
+ }
+
+ /**
+ * Returns the read thread pool metrics instance, or {@code null} if uninitialized.
+ */
+ @Override
+ public AbfsReadResourceUtilizationMetrics getAbfsReadResourceUtilizationMetrics() {
+ return abfsReadResourceUtilizationMetrics != null ? abfsReadResourceUtilizationMetrics : null;
+ }
+
+ /**
* {@inheritDoc}
*
* Method to aggregate all the counters in the MetricRegistry and form a
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index d3f1ecd..5d7d089 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -205,7 +205,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
private int blockOutputActiveBlocks;
/** Bounded ThreadPool for this instance. */
private ExecutorService boundedThreadPool;
- private WriteThreadPoolSizeManager poolSizeManager;
+ private WriteThreadPoolSizeManager writeThreadPoolSizeManager;
/** ABFS instance reference to be held by the store to avoid GC close. */
private BackReference fsBackRef;
@@ -281,11 +281,10 @@ public AzureBlobFileSystemStore(
this.blockFactory = abfsStoreBuilder.blockFactory;
this.blockOutputActiveBlocks = abfsStoreBuilder.blockOutputActiveBlocks;
if (abfsConfiguration.isDynamicWriteThreadPoolEnablement()) {
- this.poolSizeManager = WriteThreadPoolSizeManager.getInstance(
+ this.writeThreadPoolSizeManager = WriteThreadPoolSizeManager.getInstance(
getClient().getFileSystem() + "-" + UUID.randomUUID(),
- abfsConfiguration);
- poolSizeManager.startCPUMonitoring();
- this.boundedThreadPool = poolSizeManager.getExecutorService();
+ abfsConfiguration, getClient().getAbfsCounters());
+ this.boundedThreadPool = writeThreadPoolSizeManager.getExecutorService();
} else {
this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
abfsConfiguration.getWriteConcurrentRequestCount(),
@@ -343,7 +342,7 @@ public void close() throws IOException {
} catch (ExecutionException e) {
LOG.error("Error freeing leases", e);
} finally {
- IOUtils.cleanupWithLogger(LOG, poolSizeManager, getClientHandler());
+ IOUtils.cleanupWithLogger(LOG, writeThreadPoolSizeManager, getClientHandler());
}
}
@@ -822,6 +821,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
.withPath(path)
.withExecutorService(new SemaphoredDelegatingExecutor(boundedThreadPool,
blockOutputActiveBlocks, true))
+ .withWriteThreadPoolManager(writeThreadPoolSizeManager)
.withTracingContext(tracingContext)
.withAbfsBackRef(fsBackRef)
.withIngressServiceType(abfsConfiguration.getIngressServiceType())
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java
index d7887b6..24aecb2 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java
@@ -18,17 +18,8 @@
package org.apache.hadoop.fs.azurebfs;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.Closeable;
import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.lang.management.MemoryUsage;
-
-import com.sun.management.OperatingSystemMXBean;
-
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -39,19 +30,32 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
+import org.apache.hadoop.fs.azurebfs.services.AbfsWriteResourceUtilizationMetrics;
+import org.apache.hadoop.fs.azurebfs.services.ResourceUtilizationStats;
+import org.apache.hadoop.fs.azurebfs.utils.ResourceUtilizationUtils;
+
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.LOW_HEAP_SPACE_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.MEDIUM_HEAP_SPACE_FACTOR;
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BYTES_PER_GIGABYTE;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_REDUCTION_FACTOR;
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_MEDIUM_HEAP_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
-import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HEAP_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_POOL_SIZE_INCREASE_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_REDUCTION_FACTOR;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_DOWN;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_DOWN_AT_MIN;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_UP_AT_MAX;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_UP;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.THIRTY_SECONDS;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO_D;
/**
* Manages a thread pool for writing operations, adjusting the pool size based on CPU utilization.
@@ -78,10 +82,20 @@ public final class WriteThreadPoolSizeManager implements Closeable {
private final String filesystemName;
/* Initial size for the thread pool when created. */
private final int initialPoolSize;
- /* Initially available heap memory. */
- private final long initialAvailableHeapMemory;
/* The configuration instance. */
private final AbfsConfiguration abfsConfiguration;
+ /* Metrics collector for monitoring the performance of the ABFS write thread pool. */
+ private final AbfsWriteResourceUtilizationMetrics writeThreadPoolMetrics;
+ /* Flag indicating if CPU monitoring has started. */
+ private volatile boolean isMonitoringStarted = false;
+ /* Tracks the last scale direction applied, or empty if none. */
+ private volatile String lastScaleDirection = EMPTY_STRING;
+ /* Maximum CPU utilization observed during the monitoring interval. */
+ private volatile double maxJvmCpuUtilization = 0.0;
+ /** High memory usage threshold used to trigger thread pool downscaling. */
+ private final double highMemoryThreshold;
+ /** Low memory usage threshold used to allow thread pool upscaling. */
+ private final double lowMemoryThreshold;
/**
* Private constructor to initialize the write thread pool and CPU monitor executor
@@ -89,16 +103,17 @@ public final class WriteThreadPoolSizeManager implements Closeable {
*
* @param filesystemName Name of the ABFS filesystem.
* @param abfsConfiguration Configuration containing pool size parameters.
+ * @param abfsCounters ABFS counters instance used for metrics.
*/
private WriteThreadPoolSizeManager(String filesystemName,
- AbfsConfiguration abfsConfiguration) {
+ AbfsConfiguration abfsConfiguration, AbfsCounters abfsCounters) {
+ /* Retrieves and assigns the write thread pool metrics from the ABFS client counters. */
+ this.writeThreadPoolMetrics = abfsCounters.getAbfsWriteResourceUtilizationMetrics();
this.filesystemName = filesystemName;
this.abfsConfiguration = abfsConfiguration;
int availableProcessors = Runtime.getRuntime().availableProcessors();
- /* Get the heap space available when the instance is created */
- this.initialAvailableHeapMemory = getAvailableHeapMemory();
/* Compute the max pool size */
- int computedMaxPoolSize = getComputedMaxPoolSize(availableProcessors, initialAvailableHeapMemory);
+ int computedMaxPoolSize = getComputedMaxPoolSize(availableProcessors, ResourceUtilizationUtils.getAvailableMaxHeapMemory());
/* Get the initial pool size from config, fallback to at least 1 */
this.initialPoolSize = Math.max(1,
@@ -116,11 +131,13 @@ private WriteThreadPoolSizeManager(String filesystemName,
}
);
ThreadPoolExecutor executor = (ThreadPoolExecutor) this.boundedThreadPool;
- executor.setKeepAliveTime(
- abfsConfiguration.getWriteThreadPoolKeepAliveTime(), TimeUnit.SECONDS);
+ int keepAlive = Math.max(1, abfsConfiguration.getWriteThreadPoolKeepAliveTime());
+ executor.setKeepAliveTime(keepAlive, TimeUnit.SECONDS);
executor.allowCoreThreadTimeOut(true);
/* Create a scheduled executor for CPU monitoring and pool adjustment */
this.cpuMonitorExecutor = Executors.newScheduledThreadPool(1);
+ highMemoryThreshold = abfsConfiguration.getWriteHighMemoryUsageThresholdPercent() / HUNDRED_D;
+ lowMemoryThreshold = abfsConfiguration.getWriteLowMemoryUsageThresholdPercent() / HUNDRED_D;
}
/** Returns the internal {@link AbfsConfiguration}. */
@@ -146,23 +163,13 @@ private int getComputedMaxPoolSize(final int availableProcessors, long initialAv
}
/**
- * Calculates the available heap memory in gigabytes.
- * This method uses {@link Runtime#getRuntime()} to obtain the maximum heap memory
- * allowed for the JVM and subtracts the currently used memory (total - free)
- * to determine how much heap memory is still available.
- * The result is rounded up to the nearest gigabyte.
+ * Determines the maximum thread count based on available heap memory and CPU cores.
+ * Calculates the thread count as {@code availableProcessors × multiplier}, where the
+ * multiplier is selected according to the heap memory tier (low, medium, or high).
*
- * @return the available heap memory in gigabytes
- */
- private long getAvailableHeapMemory() {
- MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
- MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
- long availableHeapBytes = memoryUsage.getMax() - memoryUsage.getUsed();
- return (availableHeapBytes + BYTES_PER_GIGABYTE - 1) / BYTES_PER_GIGABYTE;
- }
-
- /**
- * Returns aggressive thread count = CPU cores × multiplier based on heap tier.
+ * @param availableHeapGB the available heap memory in gigabytes.
+ * @param availableProcessors the number of available CPU cores.
+ * @return the maximum thread count based on memory tier and processor count.
*/
private int getMemoryTierMaxThreads(long availableHeapGB, int availableProcessors) {
int multiplier;
@@ -177,15 +184,17 @@ private int getMemoryTierMaxThreads(long availableHeapGB, int availableProcessor
}
/**
- * Returns the singleton instance of WriteThreadPoolSizeManager for the given filesystem.
+ * Returns the singleton {@link WriteThreadPoolSizeManager} instance for the specified filesystem.
+ * If an active instance already exists in the manager map for the given filesystem, it is returned.
+ * Otherwise, a new instance is created, registered in the map, and returned.
*
- * @param filesystemName the name of the filesystem.
- * @param abfsConfiguration the configuration for the ABFS.
- *
- * @return the singleton instance.
+ * @param filesystemName the name of the filesystem.
+ * @param abfsConfiguration the {@link AbfsConfiguration} associated with the filesystem.
+ * @param abfsCounters the {@link AbfsCounters} used to initialize the manager.
+ * @return the singleton {@link WriteThreadPoolSizeManager} instance for the given filesystem.
*/
public static synchronized WriteThreadPoolSizeManager getInstance(
- String filesystemName, AbfsConfiguration abfsConfiguration) {
+ String filesystemName, AbfsConfiguration abfsConfiguration, AbfsCounters abfsCounters) {
/* Check if an instance already exists in the map for the given filesystem */
WriteThreadPoolSizeManager existingInstance = POOL_SIZE_MANAGER_MAP.get(
filesystemName);
@@ -201,7 +210,7 @@ public static synchronized WriteThreadPoolSizeManager getInstance(
"Creating new WriteThreadPoolSizeManager instance for filesystem: {}",
filesystemName);
WriteThreadPoolSizeManager newInstance = new WriteThreadPoolSizeManager(
- filesystemName, abfsConfiguration);
+ filesystemName, abfsConfiguration, abfsCounters);
POOL_SIZE_MANAGER_MAP.put(filesystemName, newInstance);
return newInstance;
}
@@ -232,34 +241,22 @@ private void adjustThreadPoolSize(int newMaxPoolSize) {
/**
* Starts monitoring the CPU utilization and adjusts the thread pool size accordingly.
*/
- synchronized void startCPUMonitoring() {
- cpuMonitorExecutor.scheduleAtFixedRate(() -> {
- double cpuUtilization = getCpuUtilization();
- LOG.debug("Current CPU Utilization is this: {}", cpuUtilization);
- try {
- adjustThreadPoolSizeBasedOnCPU(cpuUtilization);
- } catch (InterruptedException e) {
- throw new RuntimeException(String.format(
- "Thread pool size adjustment interrupted for filesystem %s",
- filesystemName), e);
- }
- }, 0, getAbfsConfiguration().getWriteCpuMonitoringInterval(), TimeUnit.MILLISECONDS);
- }
-
- /**
- * Gets the current system CPU utilization.
- *
- * @return the CPU utilization as a fraction (0.0 to 1.0), or 0.0 if unavailable.
- */
- private double getCpuUtilization() {
- OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(
- OperatingSystemMXBean.class);
- double cpuLoad = osBean.getSystemCpuLoad();
- if (cpuLoad < 0) {
- LOG.warn("System CPU load value unavailable (returned -1.0). Defaulting to 0.0.");
- return 0.0;
+ public synchronized void startCPUMonitoring() {
+ if (!isMonitoringStarted()) {
+ isMonitoringStarted = true;
+ cpuMonitorExecutor.scheduleAtFixedRate(() -> {
+ double cpuUtilization = ResourceUtilizationUtils.getJvmCpuLoad();
+ LOG.debug("Current CPU Utilization is this: {}", cpuUtilization);
+ try {
+ adjustThreadPoolSizeBasedOnCPU(cpuUtilization);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(String.format(
+ "Thread pool size adjustment interrupted for filesystem %s",
+ filesystemName), e);
+ }
+ }, 0, getAbfsConfiguration().getWriteCpuMonitoringInterval(),
+ TimeUnit.MILLISECONDS);
}
- return cpuLoad;
}
/**
@@ -267,31 +264,68 @@ private double getCpuUtilization() {
* and available heap memory relative to the initially available heap.
*
* @param cpuUtilization Current system CPU utilization (0.0 to 1.0)
- * @throws InterruptedException if thread locking is interrupted
+ * @throws InterruptedException if the resizing operation is interrupted while acquiring the lock
*/
public void adjustThreadPoolSizeBasedOnCPU(double cpuUtilization) throws InterruptedException {
lock.lock();
try {
ThreadPoolExecutor executor = (ThreadPoolExecutor) this.boundedThreadPool;
int currentPoolSize = executor.getMaximumPoolSize();
- long currentHeap = getAvailableHeapMemory();
- long initialHeap = initialAvailableHeapMemory;
- LOG.debug("Available heap memory: {} GB, Initial heap memory: {} GB", currentHeap, initialHeap);
- LOG.debug("Current CPU Utilization: {}", cpuUtilization);
-
+ double memoryLoad = ResourceUtilizationUtils.getMemoryLoad();
+ LOG.debug("The memory load is {} and CPU utilization is {}", memoryLoad, cpuUtilization);
if (cpuUtilization > (abfsConfiguration.getWriteHighCpuThreshold()/HUNDRED_D)) {
- newMaxPoolSize = calculateReducedPoolSizeHighCPU(currentPoolSize, currentHeap, initialHeap);
+ newMaxPoolSize = calculateReducedPoolSizeHighCPU(currentPoolSize, memoryLoad);
+ if (currentPoolSize == initialPoolSize && newMaxPoolSize == initialPoolSize) {
+ lastScaleDirection = SCALE_DIRECTION_NO_DOWN_AT_MIN;
+ }
} else if (cpuUtilization > (abfsConfiguration.getWriteMediumCpuThreshold()/HUNDRED_D)) {
- newMaxPoolSize = calculateReducedPoolSizeMediumCPU(currentPoolSize, currentHeap, initialHeap);
+ newMaxPoolSize = calculateReducedPoolSizeMediumCPU(currentPoolSize, memoryLoad);
+ if (currentPoolSize == initialPoolSize && newMaxPoolSize == initialPoolSize) {
+ lastScaleDirection = SCALE_DIRECTION_NO_DOWN_AT_MIN;
+ }
} else if (cpuUtilization < (abfsConfiguration.getWriteLowCpuThreshold()/HUNDRED_D)) {
- newMaxPoolSize = calculateIncreasedPoolSizeLowCPU(currentPoolSize, currentHeap, initialHeap);
+ newMaxPoolSize = calculateIncreasedPoolSizeLowCPU(currentPoolSize, memoryLoad);
+ if (currentPoolSize == maxThreadPoolSize && newMaxPoolSize == maxThreadPoolSize) {
+ lastScaleDirection = SCALE_DIRECTION_NO_UP_AT_MAX;
+ }
} else {
newMaxPoolSize = currentPoolSize;
LOG.debug("CPU load normal ({}). No change: current={}", cpuUtilization, currentPoolSize);
}
- if (newMaxPoolSize != currentPoolSize) {
+ boolean willResize = newMaxPoolSize != currentPoolSize;
+ if (!willResize && !lastScaleDirection.equals(EMPTY_STRING)) {
+ WriteThreadPoolStats stats = getCurrentStats(cpuUtilization, memoryLoad);
+ // Update the write thread pool metrics with the latest statistics snapshot.
+ writeThreadPoolMetrics.update(stats);
+ }
+ // Case 1: CPU increased — push metrics ONLY if not resizing
+ if (cpuUtilization > maxJvmCpuUtilization) {
+ maxJvmCpuUtilization = cpuUtilization;
+ if (!willResize) {
+ try {
+ // Capture the latest thread pool statistics (pool size, CPU, memory, etc.).
+ WriteThreadPoolStats stats = getCurrentStats(cpuUtilization, memoryLoad);
+ // Update the write thread pool metrics with the latest statistics snapshot.
+ writeThreadPoolMetrics.update(stats);
+ } catch (Exception e) {
+ LOG.debug("Error updating write thread pool metrics", e);
+ }
+ }
+ }
+ // Case 2: Resize — always push metrics
+ if (willResize) {
LOG.debug("Resizing thread pool from {} to {}", currentPoolSize, newMaxPoolSize);
+ // Record scale direction
+ lastScaleDirection = (newMaxPoolSize > currentPoolSize) ? SCALE_DIRECTION_UP: SCALE_DIRECTION_DOWN;
adjustThreadPoolSize(newMaxPoolSize);
+ try {
+ // Capture the latest thread pool statistics (pool size, CPU, memory, etc.).
+ WriteThreadPoolStats stats = getCurrentStats(cpuUtilization, memoryLoad);
+ // Update the write thread pool metrics with the latest statistics snapshot.
+ writeThreadPoolMetrics.update(stats);
+ } catch (Exception e) {
+ LOG.debug("Error updating write thread pool metrics after resizing.", e);
+ }
}
} finally {
lock.unlock();
@@ -299,12 +333,20 @@ public void adjustThreadPoolSizeBasedOnCPU(double cpuUtilization) throws Interru
}
/**
- * Calculates reduced pool size under high CPU utilization.
+ * Calculates a reduced thread pool size when high CPU utilization is detected.
+ * The reduction strategy depends on available heap memory:
+ * if heap usage is high (low free memory), the pool size is reduced aggressively;
+ * otherwise, it is reduced moderately to prevent resource contention.
+ *
+ * @param currentPoolSize the current size of the thread pool.
+ * @param memoryLoad the current JVM heap load (0.0–1.0)
+ * @return the adjusted (reduced) pool size based on CPU and memory conditions.
*/
- private int calculateReducedPoolSizeHighCPU(int currentPoolSize, long currentHeap, long initialHeap) {
- if (currentHeap <= initialHeap / HIGH_MEDIUM_HEAP_FACTOR) {
- LOG.debug("High CPU & low heap. Aggressively reducing: current={}, new={}",
- currentPoolSize, currentPoolSize / HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR);
+ private int calculateReducedPoolSizeHighCPU(int currentPoolSize, double memoryLoad) {
+ LOG.debug("The high cpu memory load is {}", memoryLoad);
+ if (memoryLoad > highMemoryThreshold) {
+ LOG.debug("High CPU & high memory load ({}). Aggressive reduction: current={}, new={}",
+ memoryLoad, currentPoolSize, currentPoolSize / HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR);
return Math.max(initialPoolSize, currentPoolSize / HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR);
}
int reduced = Math.max(initialPoolSize, currentPoolSize - currentPoolSize / HIGH_CPU_REDUCTION_FACTOR);
@@ -314,12 +356,21 @@ private int calculateReducedPoolSizeHighCPU(int currentPoolSize, long currentHea
}
/**
- * Calculates reduced pool size under medium CPU utilization.
+ * Calculates a reduced thread pool size when medium CPU utilization is detected.
+ * The reduction is based on available heap memory: if memory is low, the pool size
+ * is reduced more aggressively; otherwise, a moderate reduction is applied to
+ * maintain balanced performance.
+ *
+ * @param currentPoolSize the current size of the thread pool.
+ * @param memoryLoad the current JVM heap load (0.0–1.0)
+ * @return the adjusted (reduced) pool size based on medium CPU and memory conditions.
*/
- private int calculateReducedPoolSizeMediumCPU(int currentPoolSize, long currentHeap, long initialHeap) {
- if (currentHeap <= initialHeap / HIGH_MEDIUM_HEAP_FACTOR) {
+ private int calculateReducedPoolSizeMediumCPU(int currentPoolSize, double memoryLoad) {
+ LOG.debug("The medium cpu memory load is {}", memoryLoad);
+ if (memoryLoad > highMemoryThreshold) {
int reduced = Math.max(initialPoolSize, currentPoolSize - currentPoolSize / MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR);
- LOG.debug("Medium CPU & low heap. Reducing: current={}, new={}", currentPoolSize, reduced);
+ LOG.debug("Medium CPU & high memory load ({}). Reducing: current={}, new={}",
+ memoryLoad, currentPoolSize, reduced);
return reduced;
}
int reduced = Math.max(initialPoolSize, currentPoolSize - currentPoolSize / MEDIUM_CPU_REDUCTION_FACTOR);
@@ -329,22 +380,29 @@ private int calculateReducedPoolSizeMediumCPU(int currentPoolSize, long currentH
}
/**
- * Calculates increased pool size under low CPU utilization.
+ * Calculates an adjusted thread pool size when low CPU utilization is detected.
+ * If sufficient heap memory is available, the pool size is increased to improve throughput.
+ * Otherwise, it is slightly decreased to conserve memory resources.
+ *
+ * @param currentPoolSize the current size of the thread pool.
+ * @param memoryLoad the current JVM heap load (0.0–1.0)
+ * @return the adjusted (increased or decreased) pool size based on CPU and memory conditions.
*/
- private int calculateIncreasedPoolSizeLowCPU(int currentPoolSize, long currentHeap, long initialHeap) {
- if (currentHeap >= initialHeap * LOW_CPU_HEAP_FACTOR) {
+ private int calculateIncreasedPoolSizeLowCPU(int currentPoolSize, double memoryLoad) {
+ LOG.debug("The low cpu memory load is {}", memoryLoad);
+ if (memoryLoad <= lowMemoryThreshold) {
int increased = Math.min(maxThreadPoolSize, (int) (currentPoolSize * LOW_CPU_POOL_SIZE_INCREASE_FACTOR));
- LOG.debug("Low CPU & healthy heap. Increasing: current={}, new={}", currentPoolSize, increased);
+ LOG.debug("Low CPU & low memory load ({}). Increasing: current={}, new={}",
+ memoryLoad, currentPoolSize, increased);
return increased;
} else {
// Decrease by 10%
int decreased = Math.max(1, (int) (currentPoolSize * LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR));
- LOG.debug("Low CPU but insufficient heap ({} GB). Decreasing: current={}, new={}", currentHeap, currentPoolSize, decreased);
+ LOG.debug("Low CPU but insufficient heap. Decreasing: current={}, new={}", currentPoolSize, decreased);
return decreased;
}
}
-
/**
* Returns the executor service for the thread pool.
*
@@ -364,6 +422,26 @@ public ScheduledExecutorService getCpuMonitorExecutor() {
}
/**
+ * Checks if monitoring has started.
+ *
+ * @return true if monitoring has started, false otherwise.
+ */
+ public synchronized boolean isMonitoringStarted() {
+ return isMonitoringStarted;
+ }
+
+ /**
+ * Returns the maximum JVM CPU utilization observed during the current
+ * monitoring interval or since the last reset.
+ *
+ * @return the highest JVM CPU utilization percentage recorded
+ */
+ @VisibleForTesting
+ public double getMaxJvmCpuUtilization() {
+ return maxJvmCpuUtilization;
+ }
+
+ /**
* Closes this manager by shutting down executors and cleaning up resources.
* Removes the instance from the active manager map.
*
@@ -394,4 +472,87 @@ public void close() throws IOException {
}
}
}
+
+ /**
+ * Represents current statistics of the write thread pool and system.
+ */
+ public static class WriteThreadPoolStats extends ResourceUtilizationStats {
+
+ /**
+ * Constructs a {@link WriteThreadPoolStats} instance containing thread pool
+ * metrics and JVM/system resource utilization details.
+ *
+ * @param currentPoolSize the current number of threads in the pool
+ * @param maxPoolSize the maximum number of threads permitted in the pool
+ * @param activeThreads the number of threads actively executing tasks
+ * @param idleThreads the number of idle threads in the pool
+ * @param jvmCpuLoad the current JVM CPU load (0.0–1.0)
+ * @param systemCpuUtilization the current system-wide CPU utilization (0.0–1.0)
+ * @param availableHeapGB the available heap memory in gigabytes
+ * @param committedHeapGB the committed heap memory in gigabytes
+ * @param usedHeapGB the available heap memory in gigabytes
+ * @param maxHeapGB the committed heap memory in gigabytes
+ * @param memoryLoad the JVM memory load (used / max)
+ * @param lastScaleDirection the last scaling action performed: "I" (increase),
+ * "D" (decrease), or empty if no scaling occurred
+ * @param maxCpuUtilization the peak JVM CPU utilization observed during this interval
+ * @param jvmProcessId the process ID of the JVM
+ */
+ public WriteThreadPoolStats(int currentPoolSize,
+ int maxPoolSize, int activeThreads, int idleThreads,
+ double jvmCpuLoad, double systemCpuUtilization, double availableHeapGB,
+ double committedHeapGB, double usedHeapGB, double maxHeapGB, double memoryLoad, String lastScaleDirection,
+ double maxCpuUtilization, long jvmProcessId) {
+ super(currentPoolSize, maxPoolSize, activeThreads, idleThreads,
+ jvmCpuLoad, systemCpuUtilization, availableHeapGB,
+ committedHeapGB, usedHeapGB, maxHeapGB, memoryLoad, lastScaleDirection,
+ maxCpuUtilization, jvmProcessId);
+ }
+ }
+
+ /**
+ * Returns the latest statistics for the write thread pool and system resources.
+ * The snapshot includes thread counts, JVM and system CPU utilization, and the
+ * current heap usage. These metrics are used for monitoring and making dynamic
+ * sizing decisions for the write thread pool.
+ *
+ * @param jvmCpuUtilization current JVM CPU usage (%)
+ * @param memoryLoad current JVM memory load (used/committed)
+ * @return a {@link WriteThreadPoolStats} object containing the current metrics
+ */
+ synchronized WriteThreadPoolStats getCurrentStats(double jvmCpuUtilization,
+ double memoryLoad) {
+
+ if (boundedThreadPool == null) {
+ return new WriteThreadPoolStats(
+ ZERO, ZERO, ZERO, ZERO, ZERO_D, ZERO_D, ZERO_D, ZERO_D, ZERO_D,
+ ZERO_D, ZERO_D, EMPTY_STRING, ZERO_D, ZERO);
+ }
+
+ ThreadPoolExecutor exec = (ThreadPoolExecutor) this.boundedThreadPool;
+
+ String currentScaleDirection = lastScaleDirection;
+ lastScaleDirection = EMPTY_STRING;
+
+ int poolSize = exec.getPoolSize();
+ int activeThreads = exec.getActiveCount();
+ int idleThreads = poolSize - activeThreads;
+
+ return new WriteThreadPoolStats(
+ poolSize, // Current thread count
+ exec.getMaximumPoolSize(), // Max allowed threads
+ activeThreads, // Busy threads
+ idleThreads, // Idle threads
+ jvmCpuUtilization, // JVM CPU usage (ratio)
+ ResourceUtilizationUtils.getSystemCpuLoad(), // System CPU usage (ratio)
+ ResourceUtilizationUtils.getAvailableHeapMemory(), // Free heap (GB)
+ ResourceUtilizationUtils.getCommittedHeapMemory(), // Committed heap (GB)
+ ResourceUtilizationUtils.getUsedHeapMemory(), // Used heap (GB)
+ ResourceUtilizationUtils.getMaxHeapMemory(), // Max heap (GB)
+ memoryLoad, // used/max
+ currentScaleDirection, // "I", "D", or ""
+ getMaxJvmCpuUtilization(), // Peak JVM CPU usage so far
+ ResourceUtilizationUtils.getJvmProcessId() // JVM PID
+ );
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 91fc97e..3de55ad 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -537,6 +537,16 @@ public static String containerProperty(String property, String fsName, String ac
/** Configuration key for the high-tier memory multiplier for write workloads. Value: {@value}. */
public static final String FS_AZURE_WRITE_HIGH_TIER_MEMORY_MULTIPLIER = "fs.azure.write.high.tier.memory.multiplier";
+ /**
+ * Threshold percentage for high memory usage to scale up/down the buffer pool size in write code.
+ */
+ public static final String FS_AZURE_WRITE_HIGH_MEMORY_USAGE_THRESHOLD_PERCENT = "fs.azure.write.high.memory.usage.threshold.percent";
+
+ /**
+ * Threshold percentage for low memory usage to scale up/down the buffer pool size in write code.
+ */
+ public static final String FS_AZURE_WRITE_LOW_MEMORY_USAGE_THRESHOLD_PERCENT = "fs.azure.write.low.memory.usage.threshold.percent";
+
/**Flag to enable/disable sending client transactional ID during create/rename operations: {@value}*/
public static final String FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = "fs.azure.enable.client.transaction.id";
/**Flag to enable/disable create idempotency during create operation: {@value}*/
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 8c5bc22..1be9eca 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -245,6 +245,28 @@ public final class FileSystemConfigurations {
public static final int HUNDRED = 100;
public static final double HUNDRED_D = 100.0;
public static final long THOUSAND = 1000L;
+ // Indicates a successful scale-up operation
+ public static final int SCALE_UP = 1;
+ // Indicates a successful scale-down operation
+ public static final int SCALE_DOWN = -1;
+ // Indicates a down-scale was requested but already at minimum
+ public static final int NO_SCALE_DOWN_AT_MIN = -2;
+ // Indicates an up-scale was requested but already at maximum
+ public static final int NO_SCALE_UP_AT_MAX = 2;
+ // Indicates no scaling action was taken
+ public static final int SCALE_NONE = 0;
+ // Indicates no action is needed based on current metrics
+ public static final int NO_ACTION_NEEDED = 3;
+ // Indicates a successful scale-up operation
+ public static final String SCALE_DIRECTION_UP = "I";
+ // Indicates a successful scale-down operation
+ public static final String SCALE_DIRECTION_DOWN = "D";
+ // Indicates a down-scale was requested but pool is already at minimum
+ public static final String SCALE_DIRECTION_NO_DOWN_AT_MIN = "-D";
+ // Indicates an up-scale was requested but pool is already at maximum
+ public static final String SCALE_DIRECTION_NO_UP_AT_MAX = "+F";
+ // Indicates no scaling action is needed based on current metrics
+ public static final String SCALE_DIRECTION_NO_ACTION_NEEDED = "NA";
public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY
= HttpOperationType.APACHE_HTTP_CLIENT;
@@ -345,11 +367,6 @@ public final class FileSystemConfigurations {
public static final int DEFAULT_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT = 60;
/**
- * Minimum CPU utilization percentage considered as low threshold for write scaling.
- */
- public static final int MIN_WRITE_LOW_CPU_THRESHOLD_PERCENT = 10;
-
- /**
* Maximum CPU utilization percentage considered as low threshold for write scaling.
*/
public static final int MAX_WRITE_LOW_CPU_THRESHOLD_PERCENT = 40;
@@ -389,6 +406,12 @@ public final class FileSystemConfigurations {
*/
public static final int DEFAULT_WRITE_HIGH_TIER_MEMORY_MULTIPLIER = 16;
+ /** Percentage threshold of heap usage at which memory pressure is considered high. */
+ public static final int DEFAULT_WRITE_HIGH_MEMORY_USAGE_THRESHOLD_PERCENT = 60;
+
+ /** Percentage threshold of heap usage at which memory pressure is considered low. */
+ public static final int DEFAULT_WRITE_LOW_MEMORY_USAGE_THRESHOLD_PERCENT = 30;
+
public static final boolean DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID = true;
public static final boolean DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY = true;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsReadResourceUtilizationMetricsEnum.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsReadResourceUtilizationMetricsEnum.java
new file mode 100644
index 0000000..a9d871b
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsReadResourceUtilizationMetricsEnum.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.enums;
+
+/**
+ * Enum representing the set of metrics tracked for the ABFS read thread pool.
+ * Each metric includes a short name used for reporting and its corresponding
+ * {@link StatisticTypeEnum}, which defines how the metric is measured (e.g., gauge).
+ */
+public enum AbfsReadResourceUtilizationMetricsEnum implements
+ AbfsResourceUtilizationMetricsEnum {
+
+ /** Current number of threads in the read thread pool. */
+ CURRENT_POOL_SIZE("CP", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** Maximum configured size of the read thread pool. */
+ MAX_POOL_SIZE("MP", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** Number of threads currently executing read operations. */
+ ACTIVE_THREADS("AT", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** Number of threads currently idle. */
+ IDLE_THREADS("IT", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** Recent JVM CPU load value as reported by the JVM (0.0 to 1.0). */
+ JVM_CPU_UTILIZATION("JC", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** Overall system-wide CPU utilization percentage during read operations. */
+ SYSTEM_CPU_UTILIZATION("SC", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** Available heap memory (in GB) measured during read operations. */
+ AVAILABLE_MEMORY("AM", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** Committed heap memory (in GB) measured during read operations. */
+ COMMITTED_MEMORY("CM", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** Used heap memory (in GB) measured during read operations. */
+ USED_MEMORY("UM", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** Maximum heap memory (in GB) measured during read operations. */
+ MAX_HEAP_MEMORY("MM", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** Available heap memory (in GB) measured during read operations. */
+ MEMORY_LOAD("ML", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** Direction of the last scaling decision (e.g., scale-up or scale-down). */
+ LAST_SCALE_DIRECTION("SD", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** Maximum CPU utilization recorded during the monitoring interval. */
+ MAX_CPU_UTILIZATION("MC", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** The process ID (PID) of the running JVM, useful for correlating metrics with system-level process information. */
+ JVM_PROCESS_ID("JI", StatisticTypeEnum.TYPE_GAUGE);
+
+ private final String name;
+ private final StatisticTypeEnum statisticType;
+
+ /**
+ * Constructs a metric enum constant with its short name and type.
+ *
+ * @param name the short name or label for the metric.
+ * @param type the {@link StatisticTypeEnum} indicating the metric type.
+ */
+ AbfsReadResourceUtilizationMetricsEnum(String name, StatisticTypeEnum type) {
+ this.name = name;
+ this.statisticType = type;
+ }
+
+ /**
+ * Returns the short name of the metric.
+ *
+ * @return the metric name.
+ */
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Returns the {@link StatisticTypeEnum} associated with this metric.
+ *
+ * @return the metric's statistic type.
+ */
+ @Override
+ public StatisticTypeEnum getStatisticType() {
+ return statisticType;
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsResourceUtilizationMetricsEnum.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsResourceUtilizationMetricsEnum.java
new file mode 100644
index 0000000..c4ffaaf
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsResourceUtilizationMetricsEnum.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.enums;
+
+/**
+ * Defines the contract for all ABFS resource-level metric keys.
+ * <p>
+ * Metric enums implementing this interface supply the metric name and its
+ * {@link StatisticTypeEnum} (e.g., gauge or counter), allowing consistent
+ * registration and updates across ABFS metric sources.
+ * </p>
+ */
+public interface AbfsResourceUtilizationMetricsEnum {
+
+ /**
+ * Returns the unique metric name used for registration and reporting.
+ *
+ * @return the metric name
+ */
+ String getName();
+
+ /**
+ * Returns the statistic type associated with this metric
+ * (gauge, counter, etc.).
+ *
+ * @return the metric's {@link StatisticTypeEnum}
+ */
+ StatisticTypeEnum getStatisticType();
+}
+
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsWriteResourceUtilizationMetricsEnum.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsWriteResourceUtilizationMetricsEnum.java
new file mode 100644
index 0000000..1f92626
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/AbfsWriteResourceUtilizationMetricsEnum.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.enums;
+
+/**
+ * Enum representing the set of metrics tracked for the ABFS write thread pool.
+ * Each metric entry defines a short name identifier and its corresponding
+ * {@link StatisticTypeEnum}, which specifies the type of measurement (e.g., gauge).
+ * These metrics are used for monitoring and analyzing the performance and
+ * resource utilization of the write thread pool.
+ */
+public enum AbfsWriteResourceUtilizationMetricsEnum implements
+ AbfsResourceUtilizationMetricsEnum {
+
+ /** Current number of threads in the write thread pool. */
+ CURRENT_POOL_SIZE("CP", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** Maximum configured size of the write thread pool. */
+ MAX_POOL_SIZE("MP", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** Number of threads currently executing write operations. */
+ ACTIVE_THREADS("AT", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** Number of threads currently idle. */
+ IDLE_THREADS("IT", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** Recent JVM CPU load value as reported by the JVM (0.0 to 1.0). */
+ JVM_CPU_UTILIZATION("JC", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** Overall system-wide CPU utilization percentage during write operations. */
+ SYSTEM_CPU_UTILIZATION("SC", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** Available heap memory (in GB) measured during write operations. */
+ AVAILABLE_MEMORY("AM", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** Committed heap memory (in GB) measured during write operations. */
+ COMMITTED_MEMORY("CM", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** Used heap memory (in GB) measured during write operations. */
+ USED_MEMORY("UM", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** Maximum heap memory (in GB) measured during write operations. */
+ MAX_HEAP_MEMORY("MM", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** Available heap memory (in GB) measured during write operations. */
+ MEMORY_LOAD("ML", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** Direction of the last scaling decision (e.g., scale-up or scale-down). */
+ LAST_SCALE_DIRECTION("SD", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** Maximum CPU utilization recorded during the monitoring interval. */
+ MAX_CPU_UTILIZATION("MC", StatisticTypeEnum.TYPE_GAUGE),
+
+ /** The process ID (PID) of the running JVM, useful for correlating metrics with system-level process information. */
+ JVM_PROCESS_ID("JI", StatisticTypeEnum.TYPE_GAUGE);
+
+ private final String name;
+ private final StatisticTypeEnum statisticType;
+
+ /**
+ * Constructs a metric definition for the ABFS write thread pool.
+ *
+ * @param name the short name identifier for the metric.
+ * @param type the {@link StatisticTypeEnum} describing the metric type.
+ */
+ AbfsWriteResourceUtilizationMetricsEnum(String name, StatisticTypeEnum type) {
+ this.name = name;
+ this.statisticType = type;
+ }
+
+ /**
+ * Returns the short name identifier of the metric.
+ *
+ * @return the metric name.
+ */
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Returns the {@link StatisticTypeEnum} associated with this metric.
+ *
+ * @return the metric's statistic type.
+ */
+ @Override
+ public StatisticTypeEnum getStatisticType() {
+ return statisticType;
+ }
+}
+
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
index 0210dd5..43171ae 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
@@ -1334,7 +1334,12 @@ public AbfsRestOperation read(final String path,
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.READ_OPERATION,
abfsUriQueryBuilder, cachedSasToken);
-
+ // Retrieve the read thread pool metrics from the ABFS counters.
+ AbfsReadResourceUtilizationMetrics readResourceUtilizationMetrics = retrieveReadResourceUtilizationMetrics();
+ // If metrics are available, record them in the tracing context for diagnostics or logging.
+ if (readResourceUtilizationMetrics != null) {
+ tracingContext.setResourceUtilizationMetricResults(readResourceUtilizationMetrics.toString());
+ }
URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.GetBlob,
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
index 74bddff..18e8183 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
@@ -312,8 +312,15 @@ private AbfsClient(final URL baseUrl,
metricIdlePeriod,
metricIdlePeriod);
}
+ // Initialize write thread pool metrics if dynamic write thread pool scaling is enabled.
+ if (abfsConfiguration.isDynamicWriteThreadPoolEnablement()) {
+ abfsCounters.initializeWriteResourceUtilizationMetrics();
+ }
this.abfsMetricUrl = abfsConfiguration.getMetricUri();
-
+ // Initialize read thread pool metrics if ReadAheadV2 and its dynamic scaling feature are enabled.
+ if (abfsConfiguration.isReadAheadV2Enabled() && abfsConfiguration.isReadAheadV2DynamicScalingEnabled()) {
+ abfsCounters.initializeReadResourceUtilizationMetrics();
+ }
final Class<? extends IdentityTransformerInterface> identityTransformerClass =
abfsConfiguration.getRawConfiguration().getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class,
IdentityTransformerInterface.class);
@@ -1881,6 +1888,16 @@ protected AbfsRestOperation getSuccessOp(final AbfsRestOperationType operationTy
}
/**
+ * Retrieves the current read thread pool metrics from the ABFS counters.
+ *
+ * @return an {@link AbfsReadResourceUtilizationMetrics} instance containing
+ * the latest statistics for the read thread pool
+ */
+ protected AbfsReadResourceUtilizationMetrics retrieveReadResourceUtilizationMetrics() {
+ return getAbfsCounters().getAbfsReadResourceUtilizationMetrics();
+ }
+
+ /**
* Creates a VersionedFileStatus object from the ListResultEntrySchema.
* @param entry ListResultEntrySchema object.
* @param uri to be used for the path conversion.
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java
index 0a94b66..4512db98f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsCounters.java
@@ -83,5 +83,14 @@ String formString(String prefix, String separator, String suffix,
AbfsReadFooterMetrics getAbfsReadFooterMetrics();
+ void initializeReadResourceUtilizationMetrics();
+
+ AbfsReadResourceUtilizationMetrics getAbfsReadResourceUtilizationMetrics();
+
+ void initializeWriteResourceUtilizationMetrics();
+
+ AbfsWriteResourceUtilizationMetrics getAbfsWriteResourceUtilizationMetrics();
+
AtomicLong getLastExecutionTime();
+
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
index 3a9244c..cf2449e 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
@@ -1061,7 +1061,12 @@ public AbfsRestOperation read(final String path,
String sasTokenForReuse = appendSASTokenToQuery(path,
SASTokenProvider.READ_OPERATION,
abfsUriQueryBuilder, cachedSasToken);
-
+ // Retrieve the read thread pool metrics from the ABFS counters.
+ AbfsReadResourceUtilizationMetrics readResourceUtilizationMetrics = retrieveReadResourceUtilizationMetrics();
+ // If metrics are available, record them in the tracing context for diagnostics or logging.
+ if (readResourceUtilizationMetrics != null) {
+ tracingContext.setResourceUtilizationMetricResults(readResourceUtilizationMetrics.toString());
+ }
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = getAbfsRestOperation(
AbfsRestOperationType.ReadFile,
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index 9d29614..31b6f0f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -186,7 +186,7 @@ public AbfsInputStream(
if (readAheadV2Enabled) {
ReadBufferManagerV2.setReadBufferManagerConfigs(
readAheadBlockSize, client.getAbfsConfiguration());
- readBufferManager = ReadBufferManagerV2.getBufferManager();
+ readBufferManager = ReadBufferManagerV2.getBufferManager(client.getAbfsCounters());
} else {
ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize);
readBufferManager = ReadBufferManagerV1.getBufferManager();
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 3abf21c..6c68782 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -31,6 +31,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.fs.azurebfs.WriteThreadPoolSizeManager;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidIngressServiceException;
@@ -167,6 +168,13 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
*/
private MessageDigest fullBlobContentMd5 = null;
+ /**
+ * Instance of {@link WriteThreadPoolSizeManager} used by this class
+ * to dynamically adjust the write thread pool size based on
+ * system resource utilization.
+ */
+ private final WriteThreadPoolSizeManager writeThreadPoolSizeManager;
+
public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
throws IOException {
this.statistics = abfsOutputStreamContext.getStatistics();
@@ -217,6 +225,9 @@ public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext)
this.serviceTypeAtInit = abfsOutputStreamContext.getIngressServiceType();
this.currentExecutingServiceType = abfsOutputStreamContext.getIngressServiceType();
this.clientHandler = abfsOutputStreamContext.getClientHandler();
+ this.writeThreadPoolSizeManager = abfsOutputStreamContext.getWriteThreadPoolSizeManager();
+ // Initialize CPU monitoring if the pool size manager is present
+ initializeMonitoringIfNeeded();
createIngressHandler(serviceTypeAtInit,
abfsOutputStreamContext.getBlockFactory(), bufferSize, false, null);
try {
@@ -243,6 +254,21 @@ public AzureIngressHandler getIngressHandler() {
private volatile boolean switchCompleted = false;
/**
+ * Starts CPU monitoring in the thread pool size manager if it
+ * is initialized and not already monitoring.
+ */
+ private void initializeMonitoringIfNeeded() {
+ if (writeThreadPoolSizeManager != null && !writeThreadPoolSizeManager.isMonitoringStarted()) {
+ synchronized (this) {
+ // Re-check to avoid a race between threads
+ if (!writeThreadPoolSizeManager.isMonitoringStarted()) {
+ writeThreadPoolSizeManager.startCPUMonitoring();
+ }
+ }
+ }
+ }
+
+ /**
* Creates or retrieves an existing Azure ingress handler based on the service type and provided parameters.
* <p>
* If the `ingressHandler` is already initialized and the switch operation is complete, the existing
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
index ceae24b..68a2ba0 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
@@ -21,6 +21,7 @@
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.WriteThreadPoolSizeManager;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
@@ -79,6 +80,9 @@ public class AbfsOutputStreamContext extends AbfsStreamContext {
private AbfsClientHandler clientHandler;
+ /** Reference to the thread pool manager. */
+ private WriteThreadPoolSizeManager writeThreadPoolSizeManager;
+
public AbfsOutputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
super(sasTokenRenewPeriodForStreamsInSeconds);
}
@@ -228,6 +232,12 @@ public AbfsOutputStreamContext withEncryptionAdapter(
return this;
}
+ public AbfsOutputStreamContext withWriteThreadPoolManager(
+ final WriteThreadPoolSizeManager writeThreadPoolSizeManager) {
+ this.writeThreadPoolSizeManager = writeThreadPoolSizeManager;
+ return this;
+ }
+
public int getWriteBufferSize() {
return writeBufferSize;
}
@@ -328,6 +338,10 @@ public AbfsClientHandler getClientHandler() {
return clientHandler;
}
+ public WriteThreadPoolSizeManager getWriteThreadPoolSizeManager() {
+ return writeThreadPoolSizeManager;
+ }
+
/**
* Checks if small write is supported based on the current configuration.
*
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadResourceUtilizationMetrics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadResourceUtilizationMetrics.java
new file mode 100644
index 0000000..a38dd08
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadResourceUtilizationMetrics.java
@@ -0,0 +1,88 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
+
+import org.apache.hadoop.fs.azurebfs.enums.AbfsReadResourceUtilizationMetricsEnum;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+
+/**
+ * Metrics container for the ABFS read thread pool.
+ * <p>
+ * This class captures thread-pool sizing, CPU utilization, memory usage,
+ * scaling direction, and other runtime statistics reported by
+ * {@link ReadBufferManagerV2.ReadThreadPoolStats}.
+ * </p>
+ */
+public class AbfsReadResourceUtilizationMetrics
+ extends
+ AbstractAbfsResourceUtilizationMetrics<AbfsReadResourceUtilizationMetricsEnum> {
+
+ /**
+ * Creates a metrics set for read operations, initializing all
+ * metric keys defined in {@link AbfsReadResourceUtilizationMetricsEnum}.
+ */
+ public AbfsReadResourceUtilizationMetrics() {
+ super(AbfsReadResourceUtilizationMetricsEnum.values(), FSOperationType.READ.toString());
+ }
+
+ /**
+ * Updates all read-thread-pool metrics using the latest stats snapshot.
+ * <p>
+ * Each value from {@link ReadBufferManagerV2.ReadThreadPoolStats} is
+ * mapped to the corresponding metric, including:
+ * </p>
+ * <ul>
+ * <li>Thread pool size (current, max, active, idle)</li>
+ * <li>JVM and system CPU load (converted to percentage)</li>
+ * <li>Available and committed memory</li>
+ * <li>Memory load percentage</li>
+ * <li>Scaling direction</li>
+ * <li>Maximum CPU utilization observed</li>
+ * <li>JVM process ID</li>
+ * </ul>
+ *
+ * @param stats the latest read-thread-pool statistics; ignored if {@code null}
+ */
+ public synchronized void update(ReadBufferManagerV2.ReadThreadPoolStats stats) {
+ if (stats == null) {
+ return;
+ }
+
+ setMetricValue(AbfsReadResourceUtilizationMetricsEnum.CURRENT_POOL_SIZE, stats.getCurrentPoolSize());
+ setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MAX_POOL_SIZE, stats.getMaxPoolSize());
+ setMetricValue(AbfsReadResourceUtilizationMetricsEnum.ACTIVE_THREADS, stats.getActiveThreads());
+ setMetricValue(AbfsReadResourceUtilizationMetricsEnum.IDLE_THREADS, stats.getIdleThreads());
+ setMetricValue(AbfsReadResourceUtilizationMetricsEnum.JVM_CPU_UTILIZATION, stats.getJvmCpuLoad() * HUNDRED_D);
+ setMetricValue(AbfsReadResourceUtilizationMetricsEnum.SYSTEM_CPU_UTILIZATION, stats.getSystemCpuUtilization() * HUNDRED_D);
+ setMetricValue(AbfsReadResourceUtilizationMetricsEnum.AVAILABLE_MEMORY, stats.getMemoryUtilization());
+ setMetricValue(AbfsReadResourceUtilizationMetricsEnum.COMMITTED_MEMORY, stats.getCommittedHeapGB());
+ setMetricValue(AbfsReadResourceUtilizationMetricsEnum.USED_MEMORY, stats.getUsedHeapGB());
+ setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MAX_HEAP_MEMORY, stats.getMaxHeapGB());
+ setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MEMORY_LOAD, stats.getMemoryLoad() * HUNDRED_D);
+ setMetricValue(AbfsReadResourceUtilizationMetricsEnum.LAST_SCALE_DIRECTION,
+ stats.getLastScaleDirectionNumeric(stats.getLastScaleDirection()));
+ setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MAX_CPU_UTILIZATION, stats.getMaxCpuUtilization() * HUNDRED_D);
+ setMetricValue(AbfsReadResourceUtilizationMetricsEnum.JVM_PROCESS_ID, stats.getJvmProcessId());
+
+ markUpdated();
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsWriteResourceUtilizationMetrics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsWriteResourceUtilizationMetrics.java
new file mode 100644
index 0000000..bace442
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsWriteResourceUtilizationMetrics.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
+
+import org.apache.hadoop.fs.azurebfs.enums.AbfsWriteResourceUtilizationMetricsEnum;
+import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
+import org.apache.hadoop.fs.azurebfs.WriteThreadPoolSizeManager;
+
+/**
+ * Metrics container for the ABFS write thread pool.
+ * <p>
+ * This class records pool size, CPU utilization, memory usage,
+ * scaling direction, and other runtime indicators reported by
+ * {@link WriteThreadPoolSizeManager.WriteThreadPoolStats}.
+ * </p>
+ */
+public class AbfsWriteResourceUtilizationMetrics
+ extends
+ AbstractAbfsResourceUtilizationMetrics<AbfsWriteResourceUtilizationMetricsEnum> {
+
+ /**
+ * Creates a metrics set for write operations, pre-initializing
+ * all metric keys defined in {@link AbfsWriteResourceUtilizationMetricsEnum}.
+ */
+ public AbfsWriteResourceUtilizationMetrics() {
+ super(AbfsWriteResourceUtilizationMetricsEnum.values(), FSOperationType.WRITE.toString());
+ }
+
+ /**
+ * Updates all write-thread-pool metrics using the latest stats snapshot.
+ * Each field in {@link WriteThreadPoolSizeManager.WriteThreadPoolStats}
+ * is mapped to a corresponding metric.
+ *
+ * @param stats the latest thread-pool statistics; ignored if {@code null}
+ */
+ public synchronized void update(WriteThreadPoolSizeManager.WriteThreadPoolStats stats) {
+ if (stats == null) {
+ return;
+ }
+
+ setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.CURRENT_POOL_SIZE, stats.getCurrentPoolSize());
+ setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MAX_POOL_SIZE, stats.getMaxPoolSize());
+ setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.ACTIVE_THREADS, stats.getActiveThreads());
+ setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.IDLE_THREADS, stats.getIdleThreads());
+ setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.JVM_CPU_UTILIZATION, stats.getJvmCpuLoad() * HUNDRED_D);
+ setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.SYSTEM_CPU_UTILIZATION, stats.getSystemCpuUtilization() * HUNDRED_D);
+ setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.AVAILABLE_MEMORY, stats.getMemoryUtilization());
+ setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.COMMITTED_MEMORY, stats.getCommittedHeapGB());
+ setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.USED_MEMORY, stats.getUsedHeapGB());
+ setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MAX_HEAP_MEMORY, stats.getMaxHeapGB());
+ setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MEMORY_LOAD, stats.getMemoryLoad() * HUNDRED_D);
+ setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.LAST_SCALE_DIRECTION,
+ stats.getLastScaleDirectionNumeric(stats.getLastScaleDirection()));
+ setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MAX_CPU_UTILIZATION, stats.getMaxCpuUtilization() * HUNDRED_D);
+ setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.JVM_PROCESS_ID, stats.getJvmProcessId());
+
+ markUpdated();
+ }
+}
+
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbstractAbfsResourceUtilizationMetrics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbstractAbfsResourceUtilizationMetrics.java
new file mode 100644
index 0000000..2e5a172
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbstractAbfsResourceUtilizationMetrics.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum;
+import org.apache.hadoop.fs.azurebfs.enums.AbfsResourceUtilizationMetricsEnum;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_EQUALS;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.CHAR_DOLLAR;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
+
+/**
+ * Abstract base class for tracking ABFS resource metrics, handling metric registration,
+ * updates, versioning, and compact serialization for diagnostics.
+ *
+ * @param <T> enum type implementing {@link AbfsResourceUtilizationMetricsEnum}
+ */
+public abstract class AbstractAbfsResourceUtilizationMetrics<T extends Enum<T> & AbfsResourceUtilizationMetricsEnum>
+ extends AbstractAbfsStatisticsSource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ AbstractAbfsResourceUtilizationMetrics.class);
+
+ /**
+ * Tracks whether any metric has been updated at least once.
+ */
+ private final AtomicBoolean updatedAtLeastOnce = new AtomicBoolean(false);
+
+ /**
+ * A version counter incremented each time a metric update occurs.
+ * Used to detect whether metrics have changed since the last serialization.
+ */
+ private final AtomicLong updateVersion = new AtomicLong(0);
+
+ /**
+ * The last version number that was serialized and pushed out.
+ */
+ private final AtomicLong lastPushedVersion = new AtomicLong(-1);
+
+ /**
+ * The set of metrics supported by this metrics instance.
+ */
+ private final T[] metrics;
+
+ /**
+ * A short identifier describing the operation or subsystem these metrics represent.
+ * This prefix appears in the serialized results string.
+ */
+ private final String operationType;
+
+ /**
+ * Constructs the resource metrics abstraction.
+ * Registers gauges (and later counters) with the Hadoop {@link IOStatisticsStore}
+ * based on the metric enum values.
+ *
+ * @param metrics all metric enum constants supported by this instance
+ * @param operationType a short label used as the prefix when serializing metrics
+ */
+ protected AbstractAbfsResourceUtilizationMetrics(T[] metrics, String operationType) {
+ this.metrics = metrics;
+ this.operationType = operationType;
+
+ IOStatisticsStore store = iostatisticsStore()
+ .withGauges(getMetricNames(StatisticTypeEnum.TYPE_GAUGE))
+ .build();
+ setIOStatistics(store);
+ }
+
+ /**
+ * Extracts the names of metrics of the specified type.
+ *
+ * @param type the type of metrics to return (e.g., gauge, counter)
+ * @return an array of metric names of the given type
+ */
+ private String[] getMetricNames(StatisticTypeEnum type) {
+ return Arrays.stream(metrics)
+ .filter(m -> m.getStatisticType().equals(type))
+ .flatMap(m -> Stream.of(m.getName()))
+ .toArray(String[]::new);
+ }
+
+ /**
+ * Sets the value of a metric using its configured statistic type.
+ * <ul>
+ * <li>For {@code TYPE_GAUGE}, the value overwrites the existing gauge value.</li>
+ * <li>For {@code TYPE_COUNTER}, the value increments the counter.</li>
+ * </ul>
+ *
+ * @param metric the metric to update
+ * @param value the numeric value to assign or increment
+ */
+ protected void setMetricValue(T metric, double value) {
+ switch (metric.getStatisticType()) {
+ case TYPE_GAUGE:
+ setGaugeValue(metric.getName(), (long) value);
+ break;
+ case TYPE_COUNTER:
+ setCounterValue(metric.getName(), (long) value);
+ break;
+ default:
+ LOG.warn("Unsupported metric type: {}", metric.getStatisticType());
+ }
+ }
+
+ /**
+ * Marks that a metric update has occurred.
+ * Increments the version so consumers know that new data is available.
+ */
+ protected void markUpdated() {
+ updatedAtLeastOnce.set(true);
+ updateVersion.incrementAndGet();
+ }
+
+ /**
+ * Returns a flag indicating whether any metric has been updated since initialization.
+ *
+ * @return the {@link AtomicBoolean} tracking whether at least one update occurred
+ */
+ public boolean getUpdatedAtLeastOnce() {
+ return updatedAtLeastOnce.get();
+ }
+
+ /**
+ * Serializes the current metrics to a compact string format suitable for logs.
+ * @return a serialized metrics string or an empty string if no updates occurred
+ */
+ @Override
+ public String toString() {
+ if (!updatedAtLeastOnce.get()) {
+ return EMPTY_STRING;
+ }
+
+ long currentVersion = updateVersion.get();
+ if (currentVersion == lastPushedVersion.get()) {
+ return EMPTY_STRING;
+ }
+
+ synchronized (this) {
+ if (currentVersion == lastPushedVersion.get()) {
+ return EMPTY_STRING;
+ }
+
+ StringBuilder sb = new StringBuilder(operationType).append(CHAR_EQUALS);
+
+ for (T metric : metrics) {
+ sb.append(metric.getName())
+ .append(CHAR_EQUALS)
+ .append(lookupGaugeValue(metric.getName()))
+ .append(CHAR_DOLLAR);
+ }
+
+ lastPushedVersion.set(currentVersion);
+ return sb.toString();
+ }
+ }
+}
+
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java
index 8610add..98ef0bc 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java
@@ -125,6 +125,11 @@ protected AbfsRestOperation remoteWrite(AbfsBlock blockToUpload,
TracingContext tracingContextAppend = new TracingContext(tracingContext);
tracingContextAppend.setIngressHandler(BLOB_APPEND + " T " + threadIdStr);
tracingContextAppend.setPosition(String.valueOf(blockToUpload.getOffset()));
+ // Fetches write thread pool metrics from the ABFS client and adds them to the tracing context.
+ AbfsWriteResourceUtilizationMetrics writeResourceUtilizationMetrics = getWriteResourceUtilizationMetrics();
+ if (writeResourceUtilizationMetrics != null) {
+ tracingContextAppend.setResourceUtilizationMetricResults(writeResourceUtilizationMetrics.toString());
+ }
try {
LOG.trace("Starting remote write for block with ID {} and offset {}",
blockToUpload.getBlockId(), blockToUpload.getOffset());
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java
index 96a8f07..ccf5080 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java
@@ -117,6 +117,11 @@ protected AbfsRestOperation remoteWrite(AbfsBlock blockToUpload,
AppendRequestParameters reqParams,
TracingContext tracingContext) throws IOException {
TracingContext tracingContextAppend = new TracingContext(tracingContext);
+ // Fetches write thread pool metrics from the ABFS client and adds them to the tracing context.
+ AbfsWriteResourceUtilizationMetrics writeResourceUtilizationMetrics = getWriteResourceUtilizationMetrics();
+ if (writeResourceUtilizationMetrics != null) {
+ tracingContextAppend.setResourceUtilizationMetricResults(writeResourceUtilizationMetrics.toString());
+ }
String threadIdStr = String.valueOf(Thread.currentThread().getId());
if (tracingContextAppend.getIngressHandler().equals(EMPTY_STRING)) {
tracingContextAppend.setIngressHandler(DFS_APPEND + " T " + threadIdStr);
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java
index cfa0131..4664f1f 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java
@@ -110,6 +110,11 @@ protected AbfsRestOperation remoteWrite(AbfsBlock blockToUpload,
TracingContext tracingContext) throws IOException {
AbfsRestOperation op;
TracingContext tracingContextAppend = new TracingContext(tracingContext);
+ // Fetches write thread pool metrics from the ABFS client and adds them to the tracing context.
+ AbfsWriteResourceUtilizationMetrics writeResourceUtilizationMetrics = getWriteResourceUtilizationMetrics();
+ if (writeResourceUtilizationMetrics != null) {
+ tracingContextAppend.setResourceUtilizationMetricResults(writeResourceUtilizationMetrics.toString());
+ }
String threadIdStr = String.valueOf(Thread.currentThread().getId());
tracingContextAppend.setIngressHandler(FALLBACK_APPEND + " T " + threadIdStr);
tracingContextAppend.setPosition(String.valueOf(blockToUpload.getOffset()));
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java
index 81007e1..48f7058 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureIngressHandler.java
@@ -236,4 +236,15 @@ protected String computeFullBlobMd5() {
}
return fullBlobMd5;
}
+
+ /**
+ * Helper that returns the write thread-pool metrics from the client's counters, if available.
+ *
+ * @return the {@link AbfsWriteResourceUtilizationMetrics} instance or {@code null} when not present
+ */
+ protected AbfsWriteResourceUtilizationMetrics getWriteResourceUtilizationMetrics() {
+ return getAbfsOutputStream().getClient()
+ .getAbfsCounters()
+ .getAbfsWriteResourceUtilizationMetrics();
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
index 7943755..d64c724 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
@@ -20,12 +20,7 @@
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
-import com.sun.management.OperatingSystemMXBean;
-
import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.lang.management.MemoryUsage;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
@@ -43,11 +38,20 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.fs.azurebfs.utils.ResourceUtilizationUtils;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.concurrent.SubjectInheritingThread;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_DOWN;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_ACTION_NEEDED;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_DOWN_AT_MIN;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_UP_AT_MAX;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_UP;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO_D;
/**
* The Improved Read Buffer Manager for Rest AbfsClient.
@@ -105,14 +109,33 @@ public final class ReadBufferManagerV2 extends ReadBufferManager {
private static AtomicBoolean isConfigured = new AtomicBoolean(false);
+ /* Metrics collector for monitoring the performance of the ABFS read thread pool. */
+ private final AbfsReadResourceUtilizationMetrics readThreadPoolMetrics;
+ /* The ABFSCounters instance for updating read buffer manager related metrics. */
+ private final AbfsCounters abfsCounters;
+ /* Tracks the last scale direction applied, or empty if none. */
+ private volatile String lastScaleDirection = EMPTY_STRING;
+ /* Maximum CPU utilization observed during the monitoring interval. */
+ private volatile double maxJvmCpuUtilization = 0.0;
+
/**
* Private constructor to prevent instantiation as this needs to be singleton.
+ *
+ * @param abfsCounters the {@link AbfsCounters} used for managing read operations.
*/
- private ReadBufferManagerV2() {
+ private ReadBufferManagerV2(AbfsCounters abfsCounters) {
+ this.abfsCounters = abfsCounters;
+ readThreadPoolMetrics = abfsCounters.getAbfsReadResourceUtilizationMetrics();
printTraceLog("Creating Read Buffer Manager V2 with HADOOP-18546 patch");
}
- static ReadBufferManagerV2 getBufferManager() {
+ /**
+ * Returns the singleton instance of {@code ReadBufferManagerV2}.
+ *
+ * @param abfsCounters the {@link AbfsCounters} used for read operations.
+ * @return the singleton instance of {@code ReadBufferManagerV2}.
+ */
+ static ReadBufferManagerV2 getBufferManager(AbfsCounters abfsCounters) {
if (!isConfigured.get()) {
throw new IllegalStateException("ReadBufferManagerV2 is not configured. "
+ "Please call setReadBufferManagerConfigs() before calling getBufferManager().");
@@ -121,7 +144,7 @@ static ReadBufferManagerV2 getBufferManager() {
LOCK.lock();
try {
if (bufferManager == null) {
- bufferManager = new ReadBufferManagerV2();
+ bufferManager = new ReadBufferManagerV2(abfsCounters);
bufferManager.init();
LOGGER.trace("ReadBufferManagerV2 singleton initialized");
}
@@ -211,7 +234,7 @@ void init() {
workerThreadFactory);
workerPool.allowCoreThreadTimeOut(true);
for (int i = 0; i < minThreadPoolSize; i++) {
- ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager());
+ ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager(abfsCounters));
workerRefs.add(worker);
workerPool.submit(worker);
}
@@ -743,7 +766,7 @@ private synchronized boolean tryMemoryUpscale() {
printTraceLog("Dynamic scaling is disabled, skipping memory upscale");
return false; // Dynamic scaling is disabled, so no upscaling.
}
- double memoryLoad = getMemoryLoad();
+ double memoryLoad = ResourceUtilizationUtils.getMemoryLoad();
if (memoryLoad < memoryThreshold && getNumBuffers() < maxBufferPoolSize) {
// Create and Add more buffers in getFreeList().
int nextIndx = getNumBuffers();
@@ -789,7 +812,7 @@ > getThresholdAgeMilliseconds()) {
}
}
- double memoryLoad = getMemoryLoad();
+ double memoryLoad = ResourceUtilizationUtils.getMemoryLoad();
if (isDynamicScalingEnabled && memoryLoad > memoryThreshold) {
synchronized (this) {
if (isFreeListEmpty()) {
@@ -831,7 +854,10 @@ private boolean manualEviction(final ReadBuffer buf) {
*/
private void adjustThreadPool() {
int currentPoolSize = workerRefs.size();
- double cpuLoad = getCpuLoad();
+ double cpuLoad = ResourceUtilizationUtils.getJvmCpuLoad();
+ if (cpuLoad > maxJvmCpuUtilization) {
+ maxJvmCpuUtilization = cpuLoad;
+ }
int requiredPoolSize = getRequiredThreadPoolSize();
int newThreadPoolSize;
printTraceLog(
@@ -843,29 +869,62 @@ private void adjustThreadPool() {
(int) Math.ceil(
(currentPoolSize * (HUNDRED_D + threadPoolUpscalePercentage))
/ HUNDRED_D));
- // Create new Worker Threads
- for (int i = currentPoolSize; i < newThreadPoolSize; i++) {
- ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager());
- workerRefs.add(worker);
- workerPool.submit(worker);
+ if (currentPoolSize == maxThreadPoolSize || newThreadPoolSize == currentPoolSize) {
+ lastScaleDirection = SCALE_DIRECTION_NO_UP_AT_MAX; // Already full, cannot scale up
+ } else {
+ lastScaleDirection = SCALE_DIRECTION_UP;
}
+ // Create new Worker Threads
+ if (SCALE_DIRECTION_UP.equals(lastScaleDirection)) {
+ for (int i = currentPoolSize; i < newThreadPoolSize; i++) {
+ ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager(abfsCounters));
+ workerRefs.add(worker);
+ workerPool.submit(worker);
+ }
+ }
+ // Capture the latest thread pool statistics (pool size, CPU, memory, etc.)
+ ReadThreadPoolStats stats = getCurrentStats(cpuLoad);
+ // Update the read thread pool metrics with the latest statistics snapshot.
+ readThreadPoolMetrics.update(stats);
printTraceLog("Increased worker pool size from {} to {}", currentPoolSize,
newThreadPoolSize);
+ } else if (cpuLoad < cpuThreshold && currentPoolSize > requiredPoolSize) {
+ lastScaleDirection = SCALE_DIRECTION_NO_ACTION_NEEDED;
+ // Capture the latest thread pool statistics (pool size, CPU, memory, etc.)
+ ReadThreadPoolStats stats = getCurrentStats(cpuLoad);
+ // Update the read thread pool metrics with the latest statistics snapshot.
+ readThreadPoolMetrics.update(stats);
} else if (cpuLoad > cpuThreshold || currentPoolSize > requiredPoolSize) {
newThreadPoolSize = Math.max(minThreadPoolSize,
(int) Math.ceil(
(currentPoolSize * (HUNDRED_D - threadPoolDownscalePercentage))
/ HUNDRED_D));
- // Signal the extra workers to stop
- while (workerRefs.size() > newThreadPoolSize) {
- ReadBufferWorker worker = workerRefs.remove(workerRefs.size() - 1);
- worker.stop();
+ if (currentPoolSize == minThreadPoolSize || newThreadPoolSize == currentPoolSize) {
+ lastScaleDirection = SCALE_DIRECTION_NO_DOWN_AT_MIN; // Already at minimum, cannot scale down
+ } else {
+ lastScaleDirection = SCALE_DIRECTION_DOWN;
}
+ if (SCALE_DIRECTION_DOWN.equals(lastScaleDirection)) {
+ // Signal the extra workers to stop
+ while (workerRefs.size() > newThreadPoolSize) {
+ ReadBufferWorker worker = workerRefs.remove(workerRefs.size() - 1);
+ worker.stop();
+ }
+ }
+ // Capture the latest thread pool statistics (pool size, CPU, memory, etc.)
+ ReadThreadPoolStats stats = getCurrentStats(cpuLoad);
+ // Update the read thread pool metrics with the latest statistics snapshot.
+ readThreadPoolMetrics.update(stats);
printTraceLog("Decreased worker pool size from {} to {}", currentPoolSize,
newThreadPoolSize);
} else {
+ lastScaleDirection = EMPTY_STRING;
printTraceLog("No change in worker pool size. CPU load: {} Pool size: {}",
cpuLoad, currentPoolSize);
+ if (cpuLoad >= maxJvmCpuUtilization) {
+ ReadThreadPoolStats stats = getCurrentStats(cpuLoad);
+ readThreadPoolMetrics.update(stats); // publish snapshot
+ }
}
}
@@ -995,33 +1054,6 @@ private void printDebugLog(String message, Object... args) {
LOGGER.debug(message, args);
}
- /**
- * Get the current memory load of the JVM.
- * @return the memory load as a double value between 0.0 and 1.0
- */
- @VisibleForTesting
- double getMemoryLoad() {
- MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
- MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
- return (double) memoryUsage.getUsed() / memoryUsage.getMax();
- }
-
- /**
- * Get the current CPU load of the system.
- * @return the CPU load as a double value between 0.0 and 1.0
- */
- @VisibleForTesting
- public double getCpuLoad() {
- OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(
- OperatingSystemMXBean.class);
- double cpuLoad = osBean.getSystemCpuLoad();
- if (cpuLoad < 0) {
- // If the CPU load is not available, return 0.0
- return 0.0;
- }
- return cpuLoad;
- }
-
@VisibleForTesting
synchronized static ReadBufferManagerV2 getInstance() {
return bufferManager;
@@ -1057,6 +1089,17 @@ public ScheduledExecutorService getCpuMonitoringThread() {
return cpuMonitorThread;
}
+ /**
+ * Returns the maximum JVM CPU utilization observed during the current
+ * monitoring interval or since the last reset.
+ *
+ * @return the highest JVM CPU utilization percentage recorded
+ */
+ @VisibleForTesting
+ public double getMaxJvmCpuUtilization() {
+ return maxJvmCpuUtilization;
+ }
+
public int getRequiredThreadPoolSize() {
return (int) Math.ceil(THREAD_POOL_REQUIREMENT_BUFFER
* (getReadAheadQueue().size()
@@ -1097,4 +1140,84 @@ private void incrementActiveBufferCount() {
private void decrementActiveBufferCount() {
numberOfActiveBuffers.getAndDecrement();
}
+
+ /**
+ * Represents current statistics of the read thread pool and system.
+ */
+ public static class ReadThreadPoolStats extends ResourceUtilizationStats {
+
+ /**
+ * Constructs a {@link .ReadThreadPoolStats} instance containing thread pool
+ * metrics and JVM/system resource utilization details.
+ *
+ * @param currentPoolSize the current number of threads in the pool
+ * @param maxPoolSize the maximum number of threads permitted in the pool
+ * @param activeThreads the number of threads actively executing tasks
+ * @param idleThreads the number of idle threads in the pool
+ * @param jvmCpuLoad the current JVM CPU load (0.0–1.0)
+ * @param systemCpuUtilization the current system-wide CPU utilization (0.0–1.0)
+ * @param availableHeapGB the available heap memory in gigabytes
+ * @param committedHeapGB the committed heap memory in gigabytes
+ * @param usedHeapGB the available heap memory in gigabytes
+ * @param maxHeapGB the committed heap memory in gigabytes
+ * @param memoryLoad the JVM memory load (used / max)
+ * @param lastScaleDirection the last scaling action performed: "I" (increase),
+ * "D" (decrease), or empty if no scaling occurred
+ * @param maxCpuUtilization the peak JVM CPU utilization observed during this interval
+ * 9*+@param jvmProcessId the process ID of the JVM
+ */
+ public ReadThreadPoolStats(int currentPoolSize,
+ int maxPoolSize, int activeThreads, int idleThreads,
+ double jvmCpuLoad,
+ double systemCpuUtilization, double availableHeapGB,
+ double committedHeapGB, double usedHeapGB, double maxHeapGB, double memoryLoad,
+ String lastScaleDirection, double maxCpuUtilization, long jvmProcessId) {
+ super(currentPoolSize, maxPoolSize, activeThreads, idleThreads,
+ jvmCpuLoad, systemCpuUtilization, availableHeapGB,
+ committedHeapGB, usedHeapGB, maxHeapGB, memoryLoad, lastScaleDirection,
+ maxCpuUtilization, jvmProcessId);
+ }
+ }
+
+ /**
+ * Creates and returns a snapshot of the current read thread pool and system metrics.
+ * This method captures live values such as pool size, active threads, JVM CPU load,
+ * system CPU utilization, available heap memory, and the maximum CPU utilization
+ * observed during the current interval.
+ *
+ * @param jvmCpuLoad the current JVM process CPU utilization percentage
+ * @return a {@link ReadThreadPoolStats} object containing the current thread pool
+ * and system resource statistics
+ */
+ synchronized ReadThreadPoolStats getCurrentStats(double jvmCpuLoad) {
+ if (workerPool == null) {
+ return new ReadThreadPoolStats(ZERO, ZERO, ZERO, ZERO, ZERO_D, ZERO_D,
+ ZERO_D, ZERO_D, ZERO_D, ZERO_D, ZERO_D, EMPTY_STRING, ZERO_D, ZERO);
+ }
+
+ ThreadPoolExecutor exec = this.workerPool;
+ String currentScaleDirection = lastScaleDirection;
+ lastScaleDirection = EMPTY_STRING;
+
+ int poolSize = exec.getPoolSize();
+ int activeThreads = exec.getActiveCount();
+ int idleThreads = poolSize - activeThreads;
+
+ return new ReadThreadPoolStats(
+ poolSize, // Current thread count
+ exec.getMaximumPoolSize(), // Max allowed threads
+ activeThreads, // Busy threads
+ idleThreads, // Idle threads
+ jvmCpuLoad, // JVM CPU usage (ratio)
+ ResourceUtilizationUtils.getSystemCpuLoad(), // System CPU usage (ratio)
+ ResourceUtilizationUtils.getAvailableHeapMemory(), // Free heap (GB)
+ ResourceUtilizationUtils.getCommittedHeapMemory(), // Committed heap (GB)
+ ResourceUtilizationUtils.getUsedHeapMemory(), // Used heap (GB)
+ ResourceUtilizationUtils.getMaxHeapMemory(), // Max heap (GB)
+ ResourceUtilizationUtils.getMemoryLoad(), // used/max
+ currentScaleDirection, // "I", "D", or ""
+ getMaxJvmCpuUtilization(), // Peak JVM CPU usage so far,
+ ResourceUtilizationUtils.getJvmProcessId() // JVM process id.
+ );
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ResourceUtilizationStats.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ResourceUtilizationStats.java
new file mode 100644
index 0000000..4fa0712
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ResourceUtilizationStats.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.NO_ACTION_NEEDED;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.NO_SCALE_DOWN_AT_MIN;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.NO_SCALE_UP_AT_MAX;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_DOWN;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_ACTION_NEEDED;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_DOWN_AT_MIN;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_UP_AT_MAX;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_UP;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DOWN;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_NONE;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_UP;
+
+/**
+ * Represents current statistics of the thread pool and system.
+ */
+public abstract class ResourceUtilizationStats {
+
+ private final int currentPoolSize; // Current number of threads in the pool
+ private final int maxPoolSize; // Maximum allowed pool size
+ private final int activeThreads; // Number of threads currently executing tasks
+ private final int idleThreads; // Number of threads not executing tasks
+ private final double jvmCpuLoad; // Current JVM CPU utilization (%)
+ private final double systemCpuUtilization; // Current system CPU utilization (%)
+ private final double availableHeapGB; // Available heap memory (GB)
+ private final double committedHeapGB; // Total committed heap memory (GB)
+ private final double usedHeapGB; // Used heap memory (GB)
+ private final double maxHeapGB; // Max heap memory (GB)
+ private final double memoryLoad; // Heap usage ratio (used/max)
+ private final String lastScaleDirection; // Last resize direction: "I" (increase) or "D" (decrease)
+ private final double maxCpuUtilization; // Peak JVM CPU observed in the current interval
+ private final long jvmProcessId; // JVM Process ID
+
+ /**
+ * Constructs a {@link ResourceUtilizationStats} instance containing thread pool
+ * metrics and JVM/system resource utilization details.
+ *
+ * @param currentPoolSize the current number of threads in the pool
+ * @param maxPoolSize the maximum number of threads permitted in the pool
+ * @param activeThreads the number of threads actively executing tasks
+ * @param idleThreads the number of idle threads in the pool
+ * @param jvmCpuLoad the current JVM CPU load (0.0–1.0)
+ * @param systemCpuUtilization the current system-wide CPU utilization (0.0–1.0)
+ * @param availableHeapGB the available JVM memory in gigabytes
+ * @param committedHeapGB the committed heap memory in gigabytes
+ * @param usedHeapGB the available heap memory in gigabytes
+ * @param maxHeapGB the committed heap memory in gigabytes
+ * @param memoryLoad the JVM memory load (used / max)
+ * @param lastScaleDirection the last scaling action performed: "I" (increase),
+ * "D" (decrease), or empty if no scaling occurred
+ * @param maxCpuUtilization the peak JVM CPU utilization observed during this interval
+ * @param jvmProcessId the process ID of the JVM
+ */
+ public ResourceUtilizationStats(int currentPoolSize,
+ int maxPoolSize, int activeThreads, int idleThreads,
+ double jvmCpuLoad, double systemCpuUtilization, double availableHeapGB,
+ double committedHeapGB, double usedHeapGB, double maxHeapGB, double memoryLoad, String lastScaleDirection,
+ double maxCpuUtilization, long jvmProcessId) {
+ this.currentPoolSize = currentPoolSize;
+ this.maxPoolSize = maxPoolSize;
+ this.activeThreads = activeThreads;
+ this.idleThreads = idleThreads;
+ this.jvmCpuLoad = jvmCpuLoad;
+ this.systemCpuUtilization = systemCpuUtilization;
+ this.availableHeapGB = availableHeapGB;
+ this.committedHeapGB = committedHeapGB;
+ this.usedHeapGB = usedHeapGB;
+ this.maxHeapGB = maxHeapGB;
+ this.memoryLoad = memoryLoad;
+ this.lastScaleDirection = lastScaleDirection;
+ this.maxCpuUtilization = maxCpuUtilization;
+ this.jvmProcessId = jvmProcessId;
+ }
+
+ /** @return the current number of threads in the pool. */
+ public int getCurrentPoolSize() {
+ return currentPoolSize;
+ }
+
+ /** @return the maximum allowed size of the thread pool. */
+ public int getMaxPoolSize() {
+ return maxPoolSize;
+ }
+
+ /** @return the number of threads currently executing tasks. */
+ public int getActiveThreads() {
+ return activeThreads;
+ }
+
+ /** @return the number of threads currently idle. */
+ public int getIdleThreads() {
+ return idleThreads;
+ }
+
+ /** @return the overall system CPU utilization percentage. */
+ public double getSystemCpuUtilization() {
+ return systemCpuUtilization;
+ }
+
+ /** @return the available heap memory in gigabytes. */
+ public double getMemoryUtilization() {
+ return availableHeapGB;
+ }
+
+ /** @return the total committed heap memory in gigabytes */
+ public double getCommittedHeapGB() {
+ return committedHeapGB;
+ }
+
+ /** @return the used heap memory in gigabytes */
+ public double getUsedHeapGB() {
+ return usedHeapGB;
+ }
+
+ /** @return the max heap memory in gigabytes */
+ public double getMaxHeapGB() {
+ return maxHeapGB;
+ }
+
+ /** @return the current JVM memory load (used / committed) as a value between 0.0 and 1.0 */
+ public double getMemoryLoad() {
+ return memoryLoad;
+ }
+
+ /** @return "I" (increase), "D" (decrease), or empty. */
+ public String getLastScaleDirection() {
+ return lastScaleDirection;
+ }
+
+ /** @return the JVM process CPU utilization percentage. */
+ public double getJvmCpuLoad() {
+ return jvmCpuLoad;
+ }
+
+ /** @return the max JVM process CPU utilization percentage. */
+ public double getMaxCpuUtilization() {
+ return maxCpuUtilization;
+ }
+
+ /** @return the JVM process ID. */
+ public long getJvmProcessId() {
+ return jvmProcessId;
+ }
+
+ /**
+ * Converts the scale direction string into numeric value.
+ *
+ * @param lastScaleDirection the scale direction ("I", "D", or empty)
+ *
+ * @return 1 for increase, -1 for decrease, 0 for none
+ */
+ public int getLastScaleDirectionNumeric(String lastScaleDirection) {
+ switch (lastScaleDirection) {
+ case SCALE_DIRECTION_UP:
+ return SCALE_UP; // Scaled up
+ case SCALE_DIRECTION_DOWN:
+ return SCALE_DOWN; // Scaled down
+ case SCALE_DIRECTION_NO_DOWN_AT_MIN:
+ return NO_SCALE_DOWN_AT_MIN; // Attempted down-scale, already at minimum
+ case SCALE_DIRECTION_NO_UP_AT_MAX:
+ return NO_SCALE_UP_AT_MAX; // Attempted up-scale, already at maximum
+ case SCALE_DIRECTION_NO_ACTION_NEEDED:
+ return NO_ACTION_NEEDED; // No action needed
+ default:
+ return SCALE_NONE; // No scaling
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "currentPoolSize=%d, maxPoolSize=%d, activeThreads=%d, idleThreads=%d, "
+ + "jvmCpuLoad=%.2f%%, systemCpuUtilization=%.2f%%, "
+ + "availableHeap=%.2fGB, committedHeap=%.2fGB, memoryLoad=%.2f%%, "
+ + "scaleDirection=%s, maxCpuUtilization=%.2f%%, jvmProcessId=%d",
+ currentPoolSize, maxPoolSize, activeThreads,
+ idleThreads, jvmCpuLoad * HUNDRED_D, systemCpuUtilization * HUNDRED_D,
+ availableHeapGB, committedHeapGB, memoryLoad * HUNDRED_D,
+ lastScaleDirection, maxCpuUtilization * HUNDRED_D, jvmProcessId
+ );
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/ResourceUtilizationUtils.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/ResourceUtilizationUtils.java
new file mode 100644
index 0000000..c151a48
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/ResourceUtilizationUtils.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.utils;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+
+import com.sun.management.OperatingSystemMXBean;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BYTES_PER_GIGABYTE;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO_D;
+
+/**
+ * Utility class for retrieving JVM- and system-level resource utilization
+ * metrics such as CPU load, memory usage, and available heap memory.
+ */
+public final class ResourceUtilizationUtils {
+
+ private ResourceUtilizationUtils() {
+ // Prevent instantiation
+ }
+
+ /**
+ * Calculates the available heap memory in gigabytes.
+ * This method uses {@link Runtime#getRuntime()} to obtain the maximum heap memory
+ * allowed for the JVM and subtracts the currently used memory (total - free)
+ * to determine how much heap memory is still available.
+ * The result is rounded up to the nearest gigabyte.
+ *
+ * @return the available heap memory in gigabytes
+ */
+ public static long getAvailableHeapMemory() {
+ MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
+ MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
+ long availableHeapBytes = memoryUsage.getCommitted() - memoryUsage.getUsed();
+ return (availableHeapBytes + BYTES_PER_GIGABYTE - 1) / BYTES_PER_GIGABYTE;
+ }
+
+ /**
+ * Returns the currently committed JVM heap memory in bytes.
+ * This reflects the amount of heap the JVM has reserved from the OS and may grow as needed.
+ *
+ * @return committed heap memory in bytes
+ */
+ @VisibleForTesting
+ public static double getCommittedHeapMemory() {
+ MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
+ MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
+ return (double) memoryUsage.getCommitted() / BYTES_PER_GIGABYTE;
+ }
+
+ /**
+ * Get the current CPU load of the system.
+ * @return the CPU load as a double value between 0.0 and 1.0
+ */
+ @VisibleForTesting
+ public static double getSystemCpuLoad() {
+ OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(
+ OperatingSystemMXBean.class);
+ double cpuLoad = osBean.getSystemCpuLoad();
+ if (cpuLoad < 0) {
+ // If the CPU load is not available, return 0.0
+ return 0.0;
+ }
+ return cpuLoad;
+ }
+
+
+ /**
+ * Gets the current system CPU utilization.
+ *
+ * @return the CPU utilization as a fraction (0.0 to 1.0), or 0.0 if unavailable.
+ */
+ @VisibleForTesting
+ public static double getJvmCpuLoad() {
+ OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(
+ OperatingSystemMXBean.class);
+ double cpuLoad = osBean.getProcessCpuLoad();
+ if (cpuLoad < ZERO) {
+ return ZERO_D;
+ }
+ return cpuLoad;
+ }
+
+ /**
+ * Get the current memory load of the JVM.
+ * @return the memory load as a double value between 0.0 and 1.0
+ */
+ @VisibleForTesting
+ public static double getMemoryLoad() {
+ MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
+ MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
+ return (double) memoryUsage.getUsed() / memoryUsage.getMax();
+ }
+
+ /**
+ * Calculates the used heap memory in gigabytes.
+ * This method returns the amount of heap memory currently used by the JVM.
+ * The result is rounded up to the nearest gigabyte.
+ *
+ * @return the used heap memory in gigabytes
+ */
+ public static long getUsedHeapMemory() {
+ MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
+ MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
+ long usedHeapBytes = memoryUsage.getUsed();
+ return (usedHeapBytes + BYTES_PER_GIGABYTE - 1) / BYTES_PER_GIGABYTE;
+ }
+
+ /**
+ * Calculates the maximum heap memory allowed for the JVM in gigabytes.
+ * This is the upper bound the JVM may expand its heap to.
+ *
+ * @return the maximum heap memory in gigabytes
+ */
+ public static long getMaxHeapMemory() {
+ MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
+ MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
+ long maxHeapBytes = memoryUsage.getMax();
+ return (maxHeapBytes + BYTES_PER_GIGABYTE - 1) / BYTES_PER_GIGABYTE;
+ }
+
+
+ /**
+ * Returns the process ID (PID) of the currently running JVM.
+ * This method uses {@link ProcessHandle#current()} to obtain the ID of the
+ * Java process.
+ *
+ * @return the PID of the current JVM process
+ */
+ public static long getJvmProcessId() {
+ return ProcessHandle.current().pid();
+ }
+
+ /**
+ * Calculates the available max heap memory in gigabytes.
+ * This method uses {@link Runtime#getRuntime()} to obtain the maximum heap memory
+ * allowed for the JVM and subtracts the currently used memory (total - free)
+ * to determine how much heap memory is still available.
+ * The result is rounded up to the nearest gigabyte.
+ *
+ * @return the available heap memory in gigabytes
+ */
+ public static long getAvailableMaxHeapMemory() {
+ MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
+ MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
+ long availableHeapBytes = memoryUsage.getMax() - memoryUsage.getUsed();
+ return (availableHeapBytes + BYTES_PER_GIGABYTE - 1) / BYTES_PER_GIGABYTE;
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
index 62ac76f..8decba9 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java
@@ -69,8 +69,8 @@ public class TracingContext {
private String ingressHandler = EMPTY_STRING;
private String position = EMPTY_STRING; // position of read/write in remote file
private String metricResults = EMPTY_STRING;
- private String metricHeader = EMPTY_STRING;
private ReadType readType = ReadType.UNKNOWN_READ;
+ private String resourceUtilizationMetricResults = EMPTY_STRING;
/**
* If {@link #primaryRequestId} is null, this field shall be set equal
@@ -129,6 +129,14 @@ public TracingContext(String clientCorrelationID, String fileSystemID,
this.metricResults = metricResults;
}
+ public TracingContext(String clientCorrelationID, String fileSystemID,
+ FSOperationType opType, boolean needsPrimaryReqId,
+ TracingHeaderFormat tracingHeaderFormat, Listener listener,
+ String metricResults, String resourceUtilizationMetricResults) {
+ this(clientCorrelationID, fileSystemID, opType, needsPrimaryReqId,
+ tracingHeaderFormat, listener, metricResults);
+ this.resourceUtilizationMetricResults = resourceUtilizationMetricResults;
+ }
public TracingContext(TracingContext originalTracingContext) {
this.fileSystemID = originalTracingContext.fileSystemID;
@@ -146,7 +154,9 @@ public TracingContext(TracingContext originalTracingContext) {
}
this.metricResults = originalTracingContext.metricResults;
this.readType = originalTracingContext.readType;
+ this.resourceUtilizationMetricResults = originalTracingContext.resourceUtilizationMetricResults;
}
+
public static String validateClientCorrelationID(String clientCorrelationID) {
if ((clientCorrelationID.length() > MAX_CLIENT_CORRELATION_ID_LENGTH)
|| (!clientCorrelationID.matches(CLIENT_CORRELATION_ID_PATTERN))) {
@@ -226,28 +236,22 @@ public void constructHeader(AbfsHttpOperation httpOperation, String previousFail
+ position + COLON
+ operatedBlobCount + COLON
+ getOperationSpecificHeader(opType) + COLON
- + httpOperation.getTracingContextSuffix();
-
- metricHeader += !(metricResults.trim().isEmpty()) ? metricResults : EMPTY_STRING;
+ + httpOperation.getTracingContextSuffix() + COLON
+ + metricResults + COLON + resourceUtilizationMetricResults;
break;
case TWO_ID_FORMAT:
header = TracingHeaderVersion.getCurrentVersion() + COLON
+ clientCorrelationID + COLON + clientRequestId;
- metricHeader += !(metricResults.trim().isEmpty()) ? metricResults : EMPTY_STRING;
break;
default:
//case SINGLE_ID_FORMAT
header = TracingHeaderVersion.getCurrentVersion() + COLON
+ clientRequestId;
- metricHeader += !(metricResults.trim().isEmpty()) ? metricResults : EMPTY_STRING;
}
if (listener != null) { //for testing
listener.callTracingHeaderValidator(header, format);
}
httpOperation.setRequestProperty(HttpHeaderConfigurations.X_MS_CLIENT_REQUEST_ID, header);
- if (!metricHeader.equals(EMPTY_STRING)) {
- httpOperation.setRequestProperty(HttpHeaderConfigurations.X_MS_FECLIENT_METRICS, metricHeader);
- }
/*
* In case the primaryRequestId is an empty-string and if it is the first try to
* API call (previousFailure shall be null), maintain the last part of clientRequestId's
@@ -398,4 +402,12 @@ public void setReadType(ReadType readType) {
public ReadType getReadType() {
return readType;
}
+
+ /**
+ * Sets the resource utilization metric results string used for tracing or logging.
+ * @param resourceUtilizationMetricResults the formatted metric data to store.
+ */
+ public void setResourceUtilizationMetricResults(final String resourceUtilizationMetricResults) {
+ this.resourceUtilizationMetricResults = resourceUtilizationMetricResults;
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderVersion.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderVersion.java
index 4e5a2d6..6ce0299 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderVersion.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingHeaderVersion.java
@@ -38,7 +38,16 @@ public enum TracingHeaderVersion {
* :primaryRequestId:streamId:opType:retryHeader:ingressHandler
* :position:operatedBlobCount:operationSpecificHeader:httpOperationHeader
*/
- V1("v1", 13);
+ V1("v1", 13),
+ /**
+ * Version 2 of the tracing header, which includes a version prefix and has 16 permanent fields.
+ * This version is used for the current tracing header schema.
+ * Schema: version:clientCorrelationId:clientRequestId:fileSystemId
+ * :primaryRequestId:streamId:opType:retryHeader:ingressHandler
+ * :position:operatedBlobCount:operationSpecificHeader:httpOperationHeader
+ * :aggregatedMetrics:resourceUtilizationMetrics
+ */
+ V2("v2", 15);
private final String versionString;
private final int fieldCount;
@@ -59,7 +68,7 @@ public String toString() {
* @return the latest version of the tracing header.
*/
public static TracingHeaderVersion getCurrentVersion() {
- return V1;
+ return V2;
}
public int getFieldCount() {
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java
index 3d4c9aa..a332c30 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java
@@ -36,10 +36,19 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
+import org.apache.hadoop.fs.azurebfs.services.AbfsWriteResourceUtilizationMetrics;
+import org.apache.hadoop.fs.azurebfs.utils.ResourceUtilizationUtils;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_WRITE_MAX_CONCURRENT_REQUESTS;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_WRITE_CPU_MONITORING_INTERVAL_MILLIS;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_WRITE_LOW_CPU_THRESHOLD_PERCENT;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -49,6 +58,7 @@ class TestWriteThreadPoolSizeManager extends AbstractAbfsIntegrationTest {
private AbfsConfiguration mockConfig;
private static final double HIGH_CPU_UTILIZATION_THRESHOLD = 0.95;
private static final double LOW_CPU_UTILIZATION_THRESHOLD = 0.05;
+ private static final int LOW_MEMORY_USAGE_THRESHOLD_PERCENT = 100;
private static final int THREAD_SLEEP_DURATION_MS = 200;
private static final String TEST_FILE_PATH = "testFilePath";
private static final String TEST_DIR_PATH = "testDirPath";
@@ -95,19 +105,25 @@ public void setUp() {
when(mockConfig.getWriteMediumCpuThreshold()).thenReturn(MEDIUM_CPU_THRESHOLD);
when(mockConfig.getWriteLowCpuThreshold()).thenReturn(LOW_CPU_THRESHOLD);
when(mockConfig.getWriteCpuMonitoringInterval()).thenReturn(CPU_MONITORING_INTERVAL);
+ when(mockConfig.getWriteLowMemoryUsageThresholdPercent()).thenReturn(LOW_MEMORY_USAGE_THRESHOLD_PERCENT);
}
/**
- * Ensures that {@link WriteThreadPoolSizeManager#getInstance(String, AbfsConfiguration)} returns a singleton per key.
+ * Verifies that {@link WriteThreadPoolSizeManager#getInstance(String, AbfsConfiguration, AbfsCounters)}
+ * returns the same singleton instance for the same filesystem name, and a different instance
+ * for a different filesystem name.
*/
@Test
- void testGetInstanceReturnsSingleton() {
+ void testGetInstanceReturnsSingleton() throws IOException {
WriteThreadPoolSizeManager instance1
- = WriteThreadPoolSizeManager.getInstance("testfs", mockConfig);
+ = WriteThreadPoolSizeManager.getInstance("testfs", mockConfig,
+ getFileSystem().getAbfsClient().getAbfsCounters());
WriteThreadPoolSizeManager instance2
- = WriteThreadPoolSizeManager.getInstance("testfs", mockConfig);
+ = WriteThreadPoolSizeManager.getInstance("testfs", mockConfig,
+ getFileSystem().getAbfsClient().getAbfsCounters());
WriteThreadPoolSizeManager instance3 =
- WriteThreadPoolSizeManager.getInstance("newFs", mockConfig);
+ WriteThreadPoolSizeManager.getInstance("newFs", mockConfig,
+ getFileSystem().getAbfsClient().getAbfsCounters());
Assertions.assertThat(instance1)
.as("Expected the same singleton instance for the same key")
.isSameAs(instance2);
@@ -117,30 +133,37 @@ void testGetInstanceReturnsSingleton() {
}
/**
- /**
* Tests that high CPU usage results in thread pool downscaling.
*/
@Test
void testAdjustThreadPoolSizeBasedOnHighCPU() throws InterruptedException, IOException {
- // Get the executor service (ThreadPoolExecutor)
- WriteThreadPoolSizeManager instance
- = WriteThreadPoolSizeManager.getInstance("testfsHigh",
- getAbfsStore(getFileSystem()).getAbfsConfiguration());
- ExecutorService executor = instance.getExecutorService();
- ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
+ // Initialize filesystem and thread pool manager
+ Configuration conf = getRawConfiguration();
+ conf.setBoolean(FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT, true);
+ FileSystem fileSystem = FileSystem.newInstance(conf);
+ try (AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem) {
+ // Get the executor service (ThreadPoolExecutor)
+ WriteThreadPoolSizeManager instance
+ = WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
+ getAbfsStore(abfs).getAbfsConfiguration(),
+ abfs.getAbfsClient().getAbfsCounters());
+ ExecutorService executor = instance.getExecutorService();
+ ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
- // Simulate high CPU usage (e.g., 95% CPU utilization)
- int initialMaxSize = threadPoolExecutor.getMaximumPoolSize();
- instance.adjustThreadPoolSizeBasedOnCPU(HIGH_CPU_UTILIZATION_THRESHOLD); // High CPU
+ // Simulate high CPU usage (e.g., 95% CPU utilization)
+ int initialMaxSize = threadPoolExecutor.getMaximumPoolSize();
+ instance.adjustThreadPoolSizeBasedOnCPU(
+ HIGH_CPU_UTILIZATION_THRESHOLD); // High CPU
- // Get the new maximum pool size after adjustment
- int newMaxSize = threadPoolExecutor.getMaximumPoolSize();
+ // Get the new maximum pool size after adjustment
+ int newMaxSize = threadPoolExecutor.getMaximumPoolSize();
- // Assert that the pool size has decreased or is equal to initial PoolSize based on high CPU usage
- Assertions.assertThat(newMaxSize)
- .as("Expected pool size to decrease under high CPU usage")
- .isLessThanOrEqualTo(initialMaxSize);
- instance.close();
+ // Assert that the pool size has decreased or is equal to initial PoolSize based on high CPU usage
+ Assertions.assertThat(newMaxSize)
+ .as("Expected pool size to decrease under high CPU usage")
+ .isLessThanOrEqualTo(initialMaxSize);
+ instance.close();
+ }
}
/**
@@ -149,17 +172,24 @@ void testAdjustThreadPoolSizeBasedOnHighCPU() throws InterruptedException, IOExc
@Test
void testAdjustThreadPoolSizeBasedOnLowCPU()
throws InterruptedException, IOException {
- WriteThreadPoolSizeManager instance
- = WriteThreadPoolSizeManager.getInstance("testfsLow",
- getAbfsStore(getFileSystem()).getAbfsConfiguration());
- ExecutorService executor = instance.getExecutorService();
- int initialSize = ((ThreadPoolExecutor) executor).getMaximumPoolSize();
- instance.adjustThreadPoolSizeBasedOnCPU(LOW_CPU_UTILIZATION_THRESHOLD); // Low CPU
- int newSize = ((ThreadPoolExecutor) executor).getMaximumPoolSize();
- Assertions.assertThat(newSize)
- .as("Expected pool size to increase or stay the same under low CPU usage")
- .isGreaterThanOrEqualTo(initialSize);
- instance.close();
+ Configuration conf = getRawConfiguration();
+ conf.setBoolean(FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT, true);
+ FileSystem fileSystem = FileSystem.newInstance(conf);
+ try (AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem) {
+ WriteThreadPoolSizeManager instance
+ = WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
+ getAbfsStore(abfs).getAbfsConfiguration(),
+ abfs.getAbfsClient().getAbfsCounters());
+ ExecutorService executor = instance.getExecutorService();
+ int initialSize = ((ThreadPoolExecutor) executor).getMaximumPoolSize();
+ instance.adjustThreadPoolSizeBasedOnCPU(
+ LOW_CPU_UTILIZATION_THRESHOLD); // Low CPU
+ int newSize = ((ThreadPoolExecutor) executor).getMaximumPoolSize();
+ Assertions.assertThat(newSize)
+ .as("Expected pool size to increase or stay the same under low CPU usage")
+ .isGreaterThanOrEqualTo(initialSize);
+ instance.close();
+ }
}
@@ -169,7 +199,8 @@ void testAdjustThreadPoolSizeBasedOnLowCPU()
@Test
void testExecutorServiceIsNotNull() throws IOException {
WriteThreadPoolSizeManager instance
- = WriteThreadPoolSizeManager.getInstance("testfsExec", mockConfig);
+ = WriteThreadPoolSizeManager.getInstance("testfsExec", mockConfig,
+ getFileSystem().getAbfsClient().getAbfsCounters());
ExecutorService executor = instance.getExecutorService();
Assertions.assertThat(executor).as("Executor service should be initialized")
.isNotNull();
@@ -186,7 +217,8 @@ void testExecutorServiceIsNotNull() throws IOException {
@Test
void testCloseCleansUp() throws Exception {
WriteThreadPoolSizeManager instance
- = WriteThreadPoolSizeManager.getInstance("testfsClose", mockConfig);
+ = WriteThreadPoolSizeManager.getInstance("testfsClose", mockConfig,
+ getFileSystem().getAbfsClient().getAbfsCounters());
ExecutorService executor = instance.getExecutorService();
instance.close();
Assertions.assertThat(executor.isShutdown() || executor.isTerminated())
@@ -206,7 +238,8 @@ void testStartCPUMonitoringSchedulesTask()
throws InterruptedException, IOException {
// Create a new instance of WriteThreadPoolSizeManager using a mock configuration
WriteThreadPoolSizeManager instance
- = WriteThreadPoolSizeManager.getInstance("testScheduler", mockConfig);
+ = WriteThreadPoolSizeManager.getInstance("testScheduler", mockConfig,
+ getFileSystem().getAbfsClient().getAbfsCounters());
// Call startCPUMonitoring to schedule the monitoring task
instance.startCPUMonitoring();
@@ -241,7 +274,8 @@ void testABFSWritesUnderCPUStress() throws Exception {
// Initialize the filesystem and thread pool manager
AzureBlobFileSystem fs = getFileSystem();
WriteThreadPoolSizeManager instance =
- WriteThreadPoolSizeManager.getInstance(getFileSystemName(), getConfiguration());
+ WriteThreadPoolSizeManager.getInstance(getFileSystemName(),
+ getConfiguration(), getFileSystem().getAbfsClient().getAbfsCounters());
ThreadPoolExecutor executor =
(ThreadPoolExecutor) instance.getExecutorService();
@@ -323,7 +357,8 @@ void testDynamicResizeNoDeadlocksNoTaskLoss() throws Exception {
// Initialize filesystem and thread pool manager
AzureBlobFileSystem fs = getFileSystem();
WriteThreadPoolSizeManager mgr =
- WriteThreadPoolSizeManager.getInstance(getFileSystemName(), mockConfig);
+ WriteThreadPoolSizeManager.getInstance(getFileSystemName(), mockConfig,
+ getFileSystem().getAbfsClient().getAbfsCounters());
ThreadPoolExecutor executor = (ThreadPoolExecutor) mgr.getExecutorService();
// Enable monitoring (may not be required if adjust() is triggered internally)
@@ -498,7 +533,8 @@ void testThreadPoolScalesDownOnHighCpuLoad() throws Exception {
try (FileSystem fileSystem = FileSystem.newInstance(getRawConfiguration())) {
AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem;
WriteThreadPoolSizeManager instance =
- WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(), getConfiguration());
+ WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
+ getConfiguration(), getFileSystem().getAbfsClient().getAbfsCounters());
ThreadPoolExecutor executor =
(ThreadPoolExecutor) instance.getExecutorService();
@@ -602,7 +638,7 @@ void testScalesDownOnParallelHighMemoryLoad() throws Exception {
AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem;
WriteThreadPoolSizeManager instance =
WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
- getConfiguration());
+ getConfiguration(), getFileSystem().getAbfsClient().getAbfsCounters());
ThreadPoolExecutor executor =
(ThreadPoolExecutor) instance.getExecutorService();
@@ -717,7 +753,7 @@ void testThreadPoolScalesUpAfterIdleBurstLoad() throws Exception {
getRawConfiguration())) {
AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem;
WriteThreadPoolSizeManager instance = WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
- abfs.getAbfsStore().getAbfsConfiguration());
+ abfs.getAbfsStore().getAbfsConfiguration(), getFileSystem().getAbfsClient().getAbfsCounters());
ThreadPoolExecutor executor =
(ThreadPoolExecutor) instance.getExecutorService();
@@ -766,5 +802,101 @@ void testThreadPoolScalesUpAfterIdleBurstLoad() throws Exception {
instance.close();
}
}
+
+ /**
+ * Verifies that when the system experiences low CPU usage,
+ * the WriteThreadPoolSizeManager maintains the thread pool size
+ * without scaling down and updates the corresponding
+ * write thread pool metrics accordingly.
+ */
+ @Test
+ void testThreadPoolOnLowCpuLoadAndMetricsUpdate()
+ throws Exception {
+ // Initialize filesystem and thread pool manager
+ Configuration conf = getRawConfiguration();
+ conf.setBoolean(FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT, true);
+ conf.setInt(AZURE_WRITE_MAX_CONCURRENT_REQUESTS, 2);
+ conf.setInt(FS_AZURE_WRITE_LOW_CPU_THRESHOLD_PERCENT, 10);
+ conf.setInt(FS_AZURE_WRITE_CPU_MONITORING_INTERVAL_MILLIS, 1_000);
+ FileSystem fileSystem = FileSystem.newInstance(conf);
+ try (AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem) {
+ WriteThreadPoolSizeManager instance =
+ WriteThreadPoolSizeManager.getInstance("fs1",
+ abfs.getAbfsStore().getAbfsConfiguration(),
+ abfs.getAbfsClient().getAbfsCounters());
+ instance.startCPUMonitoring();
+
+ // --- Capture initial metrics and stats ---
+ AbfsWriteResourceUtilizationMetrics metrics =
+ abfs.getAbfsClient()
+ .getAbfsCounters()
+ .getAbfsWriteResourceUtilizationMetrics();
+
+ WriteThreadPoolSizeManager.WriteThreadPoolStats statsBefore =
+ instance.getCurrentStats(ResourceUtilizationUtils.getJvmCpuLoad(), ResourceUtilizationUtils.getMemoryLoad());
+
+ ThreadPoolExecutor executor =
+ (ThreadPoolExecutor) instance.getExecutorService();
+
+ // No CPU hogs this time — simulate light CPU load
+ // Submit lightweight ABFS tasks that barely use CPU
+ int taskCount = 10;
+ CountDownLatch latch = new CountDownLatch(taskCount);
+
+ for (int i = 0; i < taskCount; i++) {
+ executor.submit(() -> {
+ try {
+ // Light operations — minimal CPU load
+ for (int j = 0; j < 3; j++) {
+ Thread.sleep(HUNDRED); // simulate idle/light wait
+ }
+ } catch (Exception e) {
+ Assertions.fail("Light task failed unexpectedly", e);
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ // Wait for all tasks to finish
+ boolean finished = latch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ Assertions.assertThat(finished)
+ .as("All lightweight tasks should complete normally")
+ .isTrue();
+
+ // Allow some time for monitoring and metrics update
+ Thread.sleep(SLEEP_DURATION_30S_MS);
+
+ WriteThreadPoolSizeManager.WriteThreadPoolStats statsAfter =
+ instance.getCurrentStats(ResourceUtilizationUtils.getJvmCpuLoad(), ResourceUtilizationUtils.getMemoryLoad());
+
+ //--- Validate that metrics and stats changed ---
+ Assertions.assertThat(statsAfter)
+ .as("Thread pool stats should update after CPU load")
+ .isNotEqualTo(statsBefore);
+
+ String metricsOutput = metrics.toString();
+
+ if (!metricsOutput.isEmpty()) {
+ // Assertions for metrics correctness
+ Assertions.assertThat(metricsOutput)
+ .as("Metrics output should not be empty")
+ .isNotEmpty();
+
+ Assertions.assertThat(metricsOutput)
+ .as("Metrics must include CPU utilization data")
+ .contains("SC=");
+
+ Assertions.assertThat(metricsOutput)
+ .as("Metrics must include memory utilization data")
+ .contains("AM=");
+
+ Assertions.assertThat(metricsOutput)
+ .as("Metrics must include current thread pool size")
+ .contains("CP=");
+ }
+ instance.close();
+ }
+ }
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
index a5fbe30..4bf1f56 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java
@@ -124,6 +124,8 @@ public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {
public static final int REDUCED_BACKOFF_INTERVAL = 100;
public static final int BUFFER_LENGTH = 5;
public static final int BUFFER_OFFSET = 0;
+ private static final String RANDOM_URI = "abcd";
+ private static final String RANDOM_FILESYSTEM_ID = "abcde";
private final Pattern userAgentStringPattern;
@@ -174,7 +176,7 @@ public ITestAbfsClient(HttpOperationType pHttpOperationType) throws Exception {
private String getUserAgentString(AbfsConfiguration config,
boolean includeSSLProvider) throws IOException, URISyntaxException {
- AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
+ AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI(RANDOM_URI)));
AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().withAbfsCounters(abfsCounters).build();
AbfsClient client;
if (AbfsServiceType.DFS.equals(config.getFsConfiguredServiceType())) {
@@ -411,7 +413,7 @@ public static AbfsClient createTestClientFromCurrentContext(
AbfsPerfTracker tracker = new AbfsPerfTracker("test",
abfsConfig.getAccountName(),
abfsConfig);
- AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
+ AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI(RANDOM_URI)));
AbfsClientContext abfsClientContext =
new AbfsClientContextBuilder().withAbfsPerfTracker(tracker)
@@ -468,7 +470,7 @@ public static AbfsClient createBlobClientFromCurrentContext(
AbfsPerfTracker tracker = new AbfsPerfTracker("test",
abfsConfig.getAccountName(),
abfsConfig);
- AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
+ AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI(RANDOM_URI)));
AbfsClientContext abfsClientContext =
new AbfsClientContextBuilder().withAbfsPerfTracker(tracker)
@@ -500,7 +502,7 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
AbfsConfiguration abfsConfig) throws Exception {
AuthType currentAuthType = abfsConfig.getAuthType(
abfsConfig.getAccountName());
- AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
+ AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI(RANDOM_URI)));
assumeThat(currentAuthType)
.as("Auth type must be SharedKey or OAuth for this test")
@@ -750,8 +752,8 @@ public void testExpectHundredContinue() throws Exception {
Mockito.nullable(int.class), Mockito.nullable(int.class),
Mockito.any());
- TracingContext tracingContext = Mockito.spy(new TracingContext("abcd",
- "abcde", FSOperationType.APPEND,
+ TracingContext tracingContext = Mockito.spy(new TracingContext(RANDOM_URI,
+ RANDOM_FILESYSTEM_ID, FSOperationType.APPEND,
TracingHeaderFormat.ALL_ID_FORMAT, null));
// Check that expect header is enabled before the append call.
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java
index a0248ee..db5f596 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestApacheClientConnectionPool.java
@@ -149,7 +149,7 @@ public void testApacheClientFallbackDuringConnectionWarmup()
.build(),
new AbfsHttpClientConnectionFactory(), keepAliveCache,
new AbfsConfiguration(new Configuration(), EMPTY_STRING),
- new URL("https://test.com"), true);
+ new URL("https://abcd.com"), true);
Assertions.assertThat(AbfsApacheHttpClient.usable())
.describedAs("Apache HttpClient should be not usable")
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
index 84b0fbd..e663db9 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
@@ -172,7 +172,7 @@ private ReadBufferManager getBufferManager(AzureBlobFileSystem fs) {
if (getConfiguration().isReadAheadV2Enabled()) {
ReadBufferManagerV2.setReadBufferManagerConfigs(blockSize,
getConfiguration());
- return ReadBufferManagerV2.getBufferManager();
+ return ReadBufferManagerV2.getBufferManager(fs.getAbfsStore().getClient().getAbfsCounters());
}
ReadBufferManagerV1.setReadBufferManagerConfigs(blockSize);
return ReadBufferManagerV1.getBufferManager();
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
index 4a902a8..93df652 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
@@ -194,7 +194,7 @@ public AbfsInputStream getAbfsInputStream(AbfsClient abfsClient,
return inputStream;
}
- void queueReadAheads(AbfsInputStream inputStream) {
+ void queueReadAheads(AbfsInputStream inputStream) throws IOException {
// Mimic AbfsInputStream readAhead queue requests
getBufferManager()
.queueReadAhead(inputStream, 0, ONE_KB, inputStream.getTracingContext());
@@ -1215,7 +1215,8 @@ private AzureBlobFileSystem createTestFile(Path testFilePath, long testFileSize,
return fs;
}
- private void resetReadBufferManager(int bufferSize, int threshold) {
+ private void resetReadBufferManager(int bufferSize, int threshold)
+ throws IOException {
getBufferManager()
.testResetReadBufferManager(bufferSize, threshold);
// Trigger GC as aggressive recreation of ReadBufferManager buffers
@@ -1223,11 +1224,11 @@ private void resetReadBufferManager(int bufferSize, int threshold) {
System.gc();
}
- private ReadBufferManager getBufferManager() {
+ private ReadBufferManager getBufferManager() throws IOException {
if (getConfiguration().isReadAheadV2Enabled()) {
ReadBufferManagerV2.setReadBufferManagerConfigs(
getConfiguration().getReadAheadBlockSize(), getConfiguration());
- return ReadBufferManagerV2.getBufferManager();
+ return ReadBufferManagerV2.getBufferManager(getFileSystem().getAbfsStore().getClient().getAbfsCounters());
}
return ReadBufferManagerV1.getBufferManager();
}
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
index 146eed8..2129f5e 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
+import java.net.URI;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutorService;
@@ -31,6 +32,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
@@ -111,7 +113,7 @@ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
*/
@Test
public void verifyShortWriteRequest() throws Exception {
-
+ AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
AbfsClientHandler clientHandler = mock(AbfsClientHandler.class);
AbfsDfsClient client = mock(AbfsDfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
@@ -122,6 +124,8 @@ public void verifyShortWriteRequest() throws Exception {
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.getAbfsConfiguration()).thenReturn(abfsConf);
+ when(client.getAbfsCounters()).thenReturn(abfsCounters);
+ when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new AbfsWriteResourceUtilizationMetrics());
when(client.append(anyString(), any(byte[].class),
any(AppendRequestParameters.class), any(),
any(), any(TracingContext.class)))
@@ -178,7 +182,7 @@ public void verifyShortWriteRequest() throws Exception {
*/
@Test
public void verifyWriteRequest() throws Exception {
-
+ AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
AbfsClientHandler clientHandler = mock(AbfsClientHandler.class);
AbfsDfsClient client = mock(AbfsDfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
@@ -192,7 +196,8 @@ public void verifyWriteRequest() throws Exception {
TracingContext tracingContext = new TracingContext("test-corr-id",
"test-fs-id", FSOperationType.WRITE,
TracingHeaderFormat.ALL_ID_FORMAT, null);
-
+ when(client.getAbfsCounters()).thenReturn(abfsCounters);
+ when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new AbfsWriteResourceUtilizationMetrics());
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class), any(AppendRequestParameters.class), any(), any(), any(TracingContext.class))).thenReturn(op);
when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), any(), isNull(), any(), any(TracingContext.class), anyString())).thenReturn(op);
@@ -256,7 +261,7 @@ public void verifyWriteRequest() throws Exception {
*/
@Test
public void verifyWriteRequestOfBufferSizeAndClose() throws Exception {
-
+ AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
AbfsClientHandler clientHandler = mock(AbfsClientHandler.class);
AbfsDfsClient client = mock(AbfsDfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
@@ -278,6 +283,8 @@ public void verifyWriteRequestOfBufferSizeAndClose() throws Exception {
when(op.getResult()).thenReturn(httpOp);
when(clientHandler.getClient(any())).thenReturn(client);
when(clientHandler.getDfsClient()).thenReturn(client);
+ when(client.getAbfsCounters()).thenReturn(abfsCounters);
+ when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new AbfsWriteResourceUtilizationMetrics());
AbfsOutputStream out = Mockito.spy(Mockito.spy(new AbfsOutputStream(
@@ -337,7 +344,7 @@ public void verifyWriteRequestOfBufferSizeAndClose() throws Exception {
*/
@Test
public void verifyWriteRequestOfBufferSize() throws Exception {
-
+ AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
AbfsClientHandler clientHandler = mock(AbfsClientHandler.class);
AbfsDfsClient client = mock(AbfsDfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
@@ -359,6 +366,8 @@ public void verifyWriteRequestOfBufferSize() throws Exception {
when(op.getResult()).thenReturn(httpOp);
when(clientHandler.getClient(any())).thenReturn(client);
when(clientHandler.getDfsClient()).thenReturn(client);
+ when(client.getAbfsCounters()).thenReturn(abfsCounters);
+ when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new AbfsWriteResourceUtilizationMetrics());
AbfsOutputStream out = Mockito.spy(new AbfsOutputStream(
populateAbfsOutputStreamContext(
@@ -402,7 +411,7 @@ public void verifyWriteRequestOfBufferSize() throws Exception {
*/
@Test
public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception {
-
+ AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
AbfsClientHandler clientHandler = mock(AbfsClientHandler.class);
AbfsDfsClient client = mock(AbfsDfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
@@ -421,6 +430,8 @@ public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception {
isNull(), any(), any(TracingContext.class), anyString())).thenReturn(op);
when(clientHandler.getClient(any())).thenReturn(client);
when(clientHandler.getDfsClient()).thenReturn(client);
+ when(client.getAbfsCounters()).thenReturn(abfsCounters);
+ when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new AbfsWriteResourceUtilizationMetrics());
AbfsOutputStream out = Mockito.spy(new AbfsOutputStream(
populateAbfsOutputStreamContext(
BUFFER_SIZE,
@@ -466,7 +477,7 @@ public void verifyWriteRequestOfBufferSizeWithAppendBlob() throws Exception {
*/
@Test
public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception {
-
+ AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
AbfsClientHandler clientHandler = mock(AbfsClientHandler.class);
AbfsDfsClient client = mock(AbfsDfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
@@ -480,7 +491,8 @@ public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception {
TracingContext tracingContext = new TracingContext(
abfsConf.getClientCorrelationId(), "test-fs-id",
FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(), null);
-
+ when(client.getAbfsCounters()).thenReturn(abfsCounters);
+ when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new AbfsWriteResourceUtilizationMetrics());
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class),
any(AppendRequestParameters.class), any(), any(), any(TracingContext.class)))
@@ -547,7 +559,7 @@ public void verifyWriteRequestOfBufferSizeAndHFlush() throws Exception {
*/
@Test
public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception {
-
+ AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd")));
AbfsClientHandler clientHandler = mock(AbfsClientHandler.class);
AbfsDfsClient client = mock(AbfsDfsClient.class);
AbfsRestOperation op = mock(AbfsRestOperation.class);
@@ -556,6 +568,8 @@ public void verifyWriteRequestOfBufferSizeAndFlush() throws Exception {
conf.set(accountKey1, accountValue1);
abfsConf = new AbfsConfiguration(conf, accountName1);
when(client.getAbfsConfiguration()).thenReturn(abfsConf);
+ when(client.getAbfsCounters()).thenReturn(abfsCounters);
+ when(client.getAbfsCounters().getAbfsWriteResourceUtilizationMetrics()).thenReturn(new AbfsWriteResourceUtilizationMetrics());
AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, abfsConf);
when(client.getAbfsPerfTracker()).thenReturn(tracker);
when(client.append(anyString(), any(byte[].class),
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
index 77fbb76..77ae7ff 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
@@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.List;
+import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.apache.hadoop.conf.Configuration;
@@ -28,6 +29,7 @@
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+import org.apache.hadoop.fs.azurebfs.utils.ResourceUtilizationUtils;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING;
@@ -61,17 +63,18 @@ public TestReadBufferManagerV2() throws Exception {
*/
@Test
public void testReadBufferManagerV2Init() throws Exception {
+ AbfsClient abfsClient = getFileSystem().getAbfsStore().getClient();
ReadBufferManagerV2.setReadBufferManagerConfigs(getConfiguration().getReadAheadBlockSize(), getConfiguration());
- ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+ ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters()).testResetReadBufferManager();
assertThat(ReadBufferManagerV2.getInstance())
.as("ReadBufferManager should be uninitialized").isNull();
intercept(IllegalStateException.class, "ReadBufferManagerV2 is not configured.", () -> {
- ReadBufferManagerV2.getBufferManager();
+ ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters());
});
// verify that multiple invocations of getBufferManager returns same instance.
ReadBufferManagerV2.setReadBufferManagerConfigs(getConfiguration().getReadAheadBlockSize(), getConfiguration());
- ReadBufferManagerV2 bufferManager = ReadBufferManagerV2.getBufferManager();
- ReadBufferManagerV2 bufferManager2 = ReadBufferManagerV2.getBufferManager();
+ ReadBufferManagerV2 bufferManager = ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters());
+ ReadBufferManagerV2 bufferManager2 = ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters());
ReadBufferManagerV2 bufferManager3 = ReadBufferManagerV2.getInstance();
assertThat(bufferManager).isNotNull();
assertThat(bufferManager2).isNotNull();
@@ -94,11 +97,12 @@ public void testDynamicScalingSwitchingOnAndOff() throws Exception {
conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2, true);
conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, true);
try(AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(getFileSystem().getUri(), conf)) {
+ AbfsClient abfsClient = fs.getAbfsStore().getClient();
AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(), abfsConfiguration);
- ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+ ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters()).testResetReadBufferManager();
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(), abfsConfiguration);
- ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
+ ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters());
assertThat(bufferManagerV2.getCpuMonitoringThread())
.as("CPU Monitor thread should be initialized").isNotNull();
bufferManagerV2.resetBufferManager();
@@ -106,11 +110,12 @@ public void testDynamicScalingSwitchingOnAndOff() throws Exception {
conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, false);
try(AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(getFileSystem().getUri(), conf)) {
+ AbfsClient abfsClient = fs.getAbfsStore().getClient();
AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(), abfsConfiguration);
- ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+ ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters()).testResetReadBufferManager();
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(), abfsConfiguration);
- ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
+ ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters());
assertThat(bufferManagerV2.getCpuMonitoringThread())
.as("CPU Monitor thread should not be initialized").isNull();
bufferManagerV2.resetBufferManager();
@@ -127,9 +132,9 @@ public void testThreadPoolDynamicScaling() throws Exception {
AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
getAccountName());
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
- ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+ ReadBufferManagerV2.getBufferManager(client.getAbfsCounters()).testResetReadBufferManager();
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
- ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
+ ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager(client.getAbfsCounters());
assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isEqualTo(2);
int[] reqOffset = {0};
int reqLength = 1;
@@ -142,11 +147,11 @@ public void testThreadPoolDynamicScaling() throws Exception {
});
t.start();
Thread.sleep(2L * bufferManagerV2.getCpuMonitoringIntervalInMilliSec());
- assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isGreaterThan(2);
+ assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isGreaterThanOrEqualTo(2);
running = false;
t.join();
Thread.sleep(4L * bufferManagerV2.getCpuMonitoringIntervalInMilliSec());
- assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isLessThan(4);
+ assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isLessThanOrEqualTo(4);
}
@Test
@@ -159,9 +164,9 @@ public void testCpuUpscaleNotAllowedIfCpuAboveThreshold() throws Exception {
AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
getAccountName());
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
- ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+ ReadBufferManagerV2.getBufferManager(client.getAbfsCounters()).testResetReadBufferManager();
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
- ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
+ ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager(client.getAbfsCounters());
assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isEqualTo(2);
int[] reqOffset = {0};
int reqLength = 1;
@@ -188,9 +193,9 @@ public void testScheduledEviction() throws Exception {
Configuration configuration = getReadAheadV2Configuration();
AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
getAccountName());
- ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+ ReadBufferManagerV2.getBufferManager(client.getAbfsCounters()).testResetReadBufferManager();
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
- ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
+ ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager(client.getAbfsCounters());
// Add a failed buffer to completed queue and set to no free buffers to read ahead.
ReadBuffer buff = new ReadBuffer();
buff.setStatus(ReadBufferStatus.READ_FAILED);
@@ -212,9 +217,9 @@ public void testMemoryUpscaleNotAllowedIfMemoryAboveThreshold() throws Exception
AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
getAccountName());
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
- ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+ ReadBufferManagerV2.getBufferManager(client.getAbfsCounters()).testResetReadBufferManager();
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
- ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
+ ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager(client.getAbfsCounters());
// Add a failed buffer to completed queue and set to no free buffers to read ahead.
ReadBuffer buff = new ReadBuffer();
buff.setStatus(ReadBufferStatus.READ_FAILED);
@@ -236,9 +241,9 @@ public void testMemoryUpscaleIfMemoryBelowThreshold() throws Exception {
AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
getAccountName());
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
- ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+ ReadBufferManagerV2.getBufferManager(client.getAbfsCounters()).testResetReadBufferManager();
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
- ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
+ ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager(client.getAbfsCounters());
// Add a failed buffer to completed queue and set to no free buffers to read ahead.
ReadBuffer buff = new ReadBuffer();
buff.setStatus(ReadBufferStatus.READ_FAILED);
@@ -256,10 +261,11 @@ public void testMemoryDownscaleIfMemoryAboveThreshold() throws Exception {
configuration.set(FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT, "2");
AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
getAccountName());
+ AbfsClient abfsClient = getFileSystem().getAbfsStore().getClient();
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
- ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+ ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters()).testResetReadBufferManager();
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(), abfsConfig);
- ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager();
+ ReadBufferManagerV2 bufferManagerV2 = ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters());
int initialBuffers = bufferManagerV2.getMinBufferPoolSize();
assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(initialBuffers);
running = true;
@@ -283,6 +289,87 @@ public void testMemoryDownscaleIfMemoryAboveThreshold() throws Exception {
t.join();
}
+ @Test
+ public void testReadMetricUpdation() throws Exception {
+ Configuration configuration = getReadAheadV2Configuration();
+ configuration.set(FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT, "2");
+ FileSystem fileSystem = FileSystem.newInstance(configuration);
+ try (AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem) {
+ AbfsClient abfsClient = abfs.getAbfsStore().getClient();
+ AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
+ getAccountName());
+ ReadBufferManagerV2.setReadBufferManagerConfigs(
+ abfsConfig.getReadAheadBlockSize(), abfsConfig);
+ ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters()).testResetReadBufferManager();
+ ReadBufferManagerV2.setReadBufferManagerConfigs(
+ abfsConfig.getReadAheadBlockSize(), abfsConfig);
+ ReadBufferManagerV2 bufferManagerV2
+ = ReadBufferManagerV2.getBufferManager(abfsClient.getAbfsCounters());
+
+ // --- Capture initial metrics and stats ---
+ AbfsReadResourceUtilizationMetrics metrics =
+ abfsClient.getAbfsCounters().getAbfsReadResourceUtilizationMetrics();
+
+ ReadBufferManagerV2.ReadThreadPoolStats statsBefore =
+ bufferManagerV2.getCurrentStats(ResourceUtilizationUtils.getJvmCpuLoad());
+ int initialBuffers = bufferManagerV2.getMinBufferPoolSize();
+ assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(initialBuffers);
+ running = true;
+ Thread t = new Thread(() -> {
+ while (running) {
+ long maxMemory = Runtime.getRuntime().maxMemory();
+ long usedMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
+ double usage = (double) usedMemory / maxMemory;
+
+ if (usage < HIGH_MEMORY_USAGE_THRESHOLD_PERCENT) {
+ // Allocate more memory
+ allocations.add(new byte[10 * 1024 * 1024]); // 10MB
+ }
+ }
+ }, "MemoryLoadThread");
+ t.setDaemon(true);
+ t.start();
+ Thread.sleep(2L * bufferManagerV2.getMemoryMonitoringIntervalInMilliSec());
+ assertThat(bufferManagerV2.getNumBuffers()).isLessThan(initialBuffers);
+ running = false;
+ t.join();
+
+ ReadBufferManagerV2.ReadThreadPoolStats statsAfter
+ = bufferManagerV2.getCurrentStats(ResourceUtilizationUtils.getJvmCpuLoad());
+
+ // --- Validate that metrics and stats changed ---
+ Assertions.assertThat(statsAfter)
+ .as("Thread pool stats should update after CPU load")
+ .isNotEqualTo(statsBefore);
+
+ boolean updatedMetrics = metrics.getUpdatedAtLeastOnce();
+
+ Assertions.assertThat(updatedMetrics)
+ .as("Metrics should be updated at least once after CPU load")
+ .isTrue();
+
+ String metricsOutput = metrics.toString();
+
+ // Assertions for metrics correctness
+ Assertions.assertThat(metricsOutput)
+ .as("Metrics output should not be empty")
+ .isNotEmpty();
+
+ Assertions.assertThat(metricsOutput)
+ .as("Metrics must include CPU utilization data")
+ .contains("SC=");
+
+ Assertions.assertThat(metricsOutput)
+ .as("Metrics must include memory utilization data")
+ .contains("AM=");
+
+ Assertions.assertThat(metricsOutput)
+ .as("Metrics must include current thread pool size")
+ .contains("CP=");
+ }
+ }
+
+
private Configuration getReadAheadV2Configuration() {
Configuration conf = new Configuration(getRawConfiguration());
conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2, true);