[#1684] fix(server): use the diskSize obtained from periodic check to determine whether is writable (#1685)
### What changes were proposed in this pull request?
Use the disk size obtained from periodic check to determine whether the disk can be written.
### Why are the changes needed?
The disk size obtained from metadata is unreliable.
Fix: #1684
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing UTs
diff --git a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
index 5be0eb3..a0c57a5 100644
--- a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
+++ b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
@@ -109,20 +109,21 @@
}
long total = getTotalSpace(storageInfo.storageDir);
- long free = getFreeSpace(storageInfo.storageDir);
+ long availableBytes = getFreeSpace(storageInfo.storageDir);
totalSpace.addAndGet(total);
- wholeDiskUsedSpace.addAndGet(total - free);
- serviceUsedSpace.addAndGet(getServiceUsedSpace(storageInfo.storageDir));
-
- storageInfo.updateStorageFreeSpace(free);
+ wholeDiskUsedSpace.addAndGet(total - availableBytes);
+ long usedBytes = getServiceUsedSpace(storageInfo.storageDir);
+ serviceUsedSpace.addAndGet(usedBytes);
+ storageInfo.updateServiceUsedBytes(usedBytes);
+ storageInfo.updateStorageFreeSpace(availableBytes);
boolean isWritable = storageInfo.canWrite();
ShuffleServerMetrics.gaugeLocalStorageIsWritable
.labels(storageInfo.storage.getBasePath())
.set(isWritable ? 0 : 1);
- if (storageInfo.checkIsSpaceEnough(total, free)) {
+ if (storageInfo.checkIsSpaceEnough(total, availableBytes)) {
num.incrementAndGet();
}
return null;
@@ -237,16 +238,20 @@
this.storage = storage;
}
- void updateStorageFreeSpace(long free) {
- storage.updateDiskFree(free);
+ void updateStorageFreeSpace(long availableBytes) {
+ storage.updateDiskAvailableBytes(availableBytes);
}
- boolean checkIsSpaceEnough(long total, long free) {
+ void updateServiceUsedBytes(long usedBytes) {
+ storage.updateServiceUsedBytes(usedBytes);
+ }
+
+ boolean checkIsSpaceEnough(long total, long availableBytes) {
if (Double.compare(0.0, total) == 0) {
this.isHealthy = false;
return false;
}
- double usagePercent = (total - free) * 100.0 / total;
+ double usagePercent = (total - availableBytes) * 100.0 / total;
if (isHealthy) {
if (Double.compare(usagePercent, diskMaxUsagePercentage) >= 0) {
isHealthy = false;
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 772aae1..bd6ca21 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -380,7 +380,7 @@
for (LocalStorage storage : localStorages) {
String mountPoint = storage.getMountPoint();
long capacity = storage.getCapacity();
- long wroteBytes = storage.getDiskSize();
+ long wroteBytes = storage.getServiceUsedBytes();
StorageStatus status = StorageStatus.NORMAL;
if (storage.isCorrupted()) {
status = StorageStatus.UNHEALTHY;
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 274c2cb..2b21445 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -343,6 +343,7 @@
int storageIndex1 = storagePaths.indexOf(flushEvent.getUnderStorage().getStoragePath());
validateLocalMetadata(storageManager, storageIndex1, 1010L);
+ storageManager.getStorageChecker().checkIsHealthy();
flushEvent = createShuffleDataFlushEvent(appId, 3, 1, 1, 10, 102, null);
manager.addToFlushQueue(flushEvent);
// wait for write data
@@ -737,6 +738,7 @@
Thread.sleep(1000);
assertTrue(event.getUnderStorage() instanceof LocalStorage);
+ storageManager.getStorageChecker().checkIsHealthy();
// case2: huge event is written to cold storage directly
event = createShuffleDataFlushEvent(appId, 1, 1, 1, null, 100000);
flushManager.addToFlushQueue(event);
@@ -753,6 +755,7 @@
((HybridStorageManager) storageManager).getWarmStorageManager().selectStorage(event));
((HybridStorageManager) storageManager).getWarmStorageManager().updateWriteMetrics(bigEvent, 0);
+ storageManager.getStorageChecker().checkIsHealthy();
event = createShuffleDataFlushEvent(appId, 3, 1, 1, null, 100);
flushManager.addToFlushQueue(event);
Thread.sleep(1000);
@@ -793,6 +796,7 @@
assertEquals(0, event.getRetryTimes());
assertEquals(1, ShuffleServerMetrics.counterLocalFileEventFlush.get());
+ storageManager.getStorageChecker().checkIsHealthy();
// case2: huge event is written to cold storage directly
event = createShuffleDataFlushEvent(appId, 1, 1, 1, null, 100000);
flushManager.addToFlushQueue(event);
@@ -809,6 +813,7 @@
((HybridStorageManager) storageManager).getWarmStorageManager().selectStorage(event));
((HybridStorageManager) storageManager).getWarmStorageManager().updateWriteMetrics(bigEvent, 0);
+ storageManager.getStorageChecker().checkIsHealthy();
event = createShuffleDataFlushEvent(appId, 2, 1, 1, null, 100);
flushManager.addToFlushQueue(event);
waitForFlush(flushManager, appId, 2, 5);
diff --git a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
index 76371fd..7561709 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
@@ -46,7 +46,8 @@
public static final String STORAGE_HOST = "local";
private final long diskCapacity;
- private volatile long diskFree;
+ private volatile long diskAvailableBytes;
+ private volatile long serviceUsedBytes;
// for test cases
private boolean enableDiskCapacityCheck = false;
@@ -81,7 +82,7 @@
throw new RssException(ioe);
}
this.diskCapacity = baseFolder.getTotalSpace();
- this.diskFree = baseFolder.getUsableSpace();
+ this.diskAvailableBytes = baseFolder.getUsableSpace();
if (capacity < 0L) {
this.capacity = (long) (diskCapacity * builder.ratio);
@@ -91,7 +92,7 @@
builder.ratio,
diskCapacity);
} else {
- final long freeSpace = diskFree;
+ final long freeSpace = diskAvailableBytes;
if (freeSpace < capacity) {
throw new IllegalArgumentException(
"The Disk of "
@@ -167,14 +168,14 @@
if (isSpaceEnough) {
serviceUsedCapacityCheck =
- metaData.getDiskSize().doubleValue() * 100 / capacity < highWaterMarkOfWrite;
+ (double) (serviceUsedBytes * 100) / capacity < highWaterMarkOfWrite;
diskUsedCapacityCheck =
- ((double) (diskCapacity - diskFree)) * 100 / diskCapacity < highWaterMarkOfWrite;
+ ((double) (diskCapacity - diskAvailableBytes)) * 100 / diskCapacity
+ < highWaterMarkOfWrite;
} else {
- serviceUsedCapacityCheck =
- metaData.getDiskSize().doubleValue() * 100 / capacity < lowWaterMarkOfWrite;
+ serviceUsedCapacityCheck = (double) (serviceUsedBytes * 100) / capacity < lowWaterMarkOfWrite;
diskUsedCapacityCheck =
- ((double) (diskCapacity - diskFree)) * 100 / diskCapacity < lowWaterMarkOfWrite;
+ ((double) (diskCapacity - diskAvailableBytes)) * 100 / diskCapacity < lowWaterMarkOfWrite;
}
isSpaceEnough =
serviceUsedCapacityCheck && (enableDiskCapacityCheck ? diskUsedCapacityCheck : true);
@@ -203,10 +204,6 @@
metaData.updateShuffleLastReadTs(shuffleKey);
}
- public long getDiskSize() {
- return metaData.getDiskSize().longValue();
- }
-
@VisibleForTesting
public LocalStorageMeta getMetaData() {
return metaData;
@@ -266,8 +263,16 @@
return appIds;
}
- public void updateDiskFree(long free) {
- this.diskFree = free;
+ public void updateDiskAvailableBytes(long bytes) {
+ this.diskAvailableBytes = bytes;
+ }
+
+ public void updateServiceUsedBytes(long usedBytes) {
+ this.serviceUsedBytes = usedBytes;
+ }
+
+ public long getServiceUsedBytes() {
+ return serviceUsedBytes;
}
// Only for test
diff --git a/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java b/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
index 68b4aaf..4ce4d1b 100644
--- a/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
+++ b/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
@@ -76,15 +76,15 @@
public void canWriteTest() {
LocalStorage item = createTestStorage(testBaseDir);
- item.getMetaData().updateDiskSize(20);
+ item.updateServiceUsedBytes(20);
assertTrue(item.canWrite());
- item.getMetaData().updateDiskSize(65);
+ item.updateServiceUsedBytes(item.getServiceUsedBytes() + 65);
assertTrue(item.canWrite());
- item.getMetaData().updateDiskSize(10);
+ item.updateServiceUsedBytes(item.getServiceUsedBytes() + 10);
assertFalse(item.canWrite());
- item.getMetaData().updateDiskSize(-10);
+ item.updateServiceUsedBytes(item.getServiceUsedBytes() - 10);
assertFalse(item.canWrite());
- item.getMetaData().updateDiskSize(-10);
+ item.updateServiceUsedBytes(item.getServiceUsedBytes() - 10);
assertTrue(item.canWrite());
}
@@ -178,18 +178,18 @@
.enableDiskCapacityWatermarkCheck()
.build();
- localStorage.getMetaData().updateDiskSize(20);
+ localStorage.updateServiceUsedBytes(localStorage.getServiceUsedBytes() + 20);
assertTrue(localStorage.canWrite());
- localStorage.getMetaData().updateDiskSize(65);
+ localStorage.updateServiceUsedBytes(localStorage.getServiceUsedBytes() + 65);
assertTrue(localStorage.canWrite());
final long diskCapacity = localStorage.getDiskCapacity();
- localStorage.updateDiskFree((long) (diskCapacity * (1 - 0.96)));
+ localStorage.updateDiskAvailableBytes((long) (diskCapacity * (1 - 0.96)));
assertFalse(localStorage.canWrite());
- localStorage.updateDiskFree((long) (diskCapacity * (1 - 0.60)));
+ localStorage.updateDiskAvailableBytes((long) (diskCapacity * (1 - 0.60)));
assertFalse(localStorage.canWrite());
- localStorage.getMetaData().updateDiskSize(-10);
+ localStorage.updateServiceUsedBytes(localStorage.getServiceUsedBytes() - 10);
assertTrue(localStorage.canWrite());
// capacity = diskCapacity
@@ -203,10 +203,10 @@
.enableDiskCapacityWatermarkCheck()
.build();
- localStorage.updateDiskFree((long) (diskCapacity * (1 - 0.96)));
+ localStorage.updateDiskAvailableBytes((long) (diskCapacity * (1 - 0.96)));
assertFalse(localStorage.canWrite());
- localStorage.updateDiskFree((long) (diskCapacity * (1 - 0.60)));
+ localStorage.updateDiskAvailableBytes((long) (diskCapacity * (1 - 0.60)));
assertTrue(localStorage.canWrite());
}
}