[#1645] feat(server): Add gauge metrics for reading localfile data (#1646)
### What changes were proposed in this pull request?
Add two metrics for each reading memory/index/local files:
1. Add a gauge metric for its threads number.
2. Add a gauge metric for its reading size.
### Why are the changes needed?
Fix https://github.com/apache/incubator-uniffle/issues/1645.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing UTs.
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 9f8f79e..47a0e80 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -693,6 +693,8 @@
ShuffleServerMetrics.counterTotalReadTime.inc(readTime);
ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getDataLength());
ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getDataLength());
+ ShuffleServerMetrics.gaugeReadLocalDataFileThreadNum.inc();
+ ShuffleServerMetrics.gaugeReadLocalDataFileBufferSize.inc(length);
shuffleServer
.getGrpcMetrics()
.recordProcessTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD, readTime);
@@ -718,6 +720,8 @@
} finally {
if (sdr != null) {
sdr.release();
+ ShuffleServerMetrics.gaugeReadLocalDataFileThreadNum.dec();
+ ShuffleServerMetrics.gaugeReadLocalDataFileBufferSize.dec(length);
}
shuffleServer.getShuffleBufferManager().releaseReadMemory(length);
}
@@ -769,18 +773,23 @@
if (shuffleServer.getShuffleBufferManager().requireReadMemory(assumedFileSize)) {
ShuffleIndexResult shuffleIndexResult = null;
try {
- long start = System.currentTimeMillis();
+ final long start = System.currentTimeMillis();
shuffleIndexResult =
shuffleServer
.getShuffleTaskManager()
.getShuffleIndex(appId, shuffleId, partitionId, partitionNumPerRange, partitionNum);
- long readTime = System.currentTimeMillis() - start;
ByteBuffer data = shuffleIndexResult.getIndexData();
ShuffleServerMetrics.counterTotalReadDataSize.inc(data.remaining());
ShuffleServerMetrics.counterTotalReadLocalIndexFileSize.inc(data.remaining());
+ ShuffleServerMetrics.gaugeReadLocalIndexFileThreadNum.inc();
+ ShuffleServerMetrics.gaugeReadLocalIndexFileBufferSize.inc(assumedFileSize);
GetLocalShuffleIndexResponse.Builder builder =
GetLocalShuffleIndexResponse.newBuilder().setStatus(status.toProto()).setRetMsg(msg);
+ long readTime = System.currentTimeMillis() - start;
+ shuffleServer
+ .getGrpcMetrics()
+ .recordProcessTime(ShuffleServerGrpcMetrics.GET_SHUFFLE_INDEX_METHOD, readTime);
LOG.info(
"Successfully getShuffleIndex cost {} ms for {}" + " bytes with {}",
readTime,
@@ -808,6 +817,8 @@
} finally {
if (shuffleIndexResult != null) {
shuffleIndexResult.release();
+ ShuffleServerMetrics.gaugeReadLocalIndexFileThreadNum.dec();
+ ShuffleServerMetrics.gaugeReadLocalIndexFileBufferSize.dec(assumedFileSize);
}
shuffleServer.getShuffleBufferManager().releaseReadMemory(assumedFileSize);
}
@@ -845,7 +856,6 @@
ShuffleServerGrpcMetrics.GET_MEMORY_SHUFFLE_DATA_METHOD, transportTime);
}
}
- long start = System.currentTimeMillis();
StatusCode status = StatusCode.SUCCESS;
String msg = "OK";
GetMemoryShuffleDataResponse reply;
@@ -856,6 +866,7 @@
if (shuffleServer.getShuffleBufferManager().requireReadMemory(readBufferSize)) {
ShuffleDataResult shuffleDataResult = null;
try {
+ final long start = System.currentTimeMillis();
Roaring64NavigableMap expectedTaskIds = null;
if (request.getSerializedExpectedTaskIdsBitmap() != null
&& !request.getSerializedExpectedTaskIdsBitmap().isEmpty()) {
@@ -875,6 +886,8 @@
bufferSegments = shuffleDataResult.getBufferSegments();
ShuffleServerMetrics.counterTotalReadDataSize.inc(data.length);
ShuffleServerMetrics.counterTotalReadMemoryDataSize.inc(data.length);
+ ShuffleServerMetrics.gaugeReadMemoryDataThreadNum.inc();
+ ShuffleServerMetrics.gaugeReadMemoryDataBufferSize.inc(readBufferSize);
}
long costTime = System.currentTimeMillis() - start;
shuffleServer
@@ -911,6 +924,8 @@
} finally {
if (shuffleDataResult != null) {
shuffleDataResult.release();
+ ShuffleServerMetrics.gaugeReadMemoryDataThreadNum.dec();
+ ShuffleServerMetrics.gaugeReadMemoryDataBufferSize.dec(readBufferSize);
}
shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize);
}
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index f1f3736..b4566b3 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -54,6 +54,13 @@
"localfile_flush_thread_pool_queue_size";
private static final String FALLBACK_FLUSH_THREAD_POOL_QUEUE_SIZE =
"fallback_flush_thread_pool_queue_size";
+ private static final String READ_LOCAL_DATA_FILE_THREAD_NUM = "read_local_data_file_thread_num";
+ private static final String READ_LOCAL_INDEX_FILE_THREAD_NUM = "read_local_index_file_thread_num";
+ private static final String READ_MEMORY_DATA_THREAD_NUM = "read_memory_data_thread_num";
+ private static final String READ_LOCAL_DATA_FILE_BUFFER_SIZE = "read_local_data_file_buffer_size";
+ private static final String READ_LOCAL_INDEX_FILE_BUFFER_SIZE =
+ "read_local_index_file_buffer_size";
+ private static final String READ_MEMORY_DATA_BUFFER_SIZE = "read_memory_data_buffer_size";
private static final String TOTAL_READ_DATA = "total_read_data";
private static final String TOTAL_READ_LOCAL_DATA_FILE = "total_read_local_data_file";
private static final String TOTAL_READ_LOCAL_INDEX_FILE = "total_read_local_index_file";
@@ -196,6 +203,12 @@
public static Gauge.Child gaugeFallbackFlushThreadPoolQueueSize;
public static Gauge.Child gaugeAppNum;
public static Gauge.Child gaugeTotalPartitionNum;
+ public static Gauge.Child gaugeReadLocalDataFileThreadNum;
+ public static Gauge.Child gaugeReadLocalIndexFileThreadNum;
+ public static Gauge.Child gaugeReadMemoryDataThreadNum;
+ public static Gauge.Child gaugeReadLocalDataFileBufferSize;
+ public static Gauge.Child gaugeReadLocalIndexFileBufferSize;
+ public static Gauge.Child gaugeReadMemoryDataBufferSize;
public static Gauge gaugeTotalDataSizeUsage;
public static Gauge gaugeInMemoryDataSizeUsage;
@@ -401,6 +414,17 @@
gaugeAppNum = metricsManager.addLabeledGauge(APP_NUM_WITH_NODE);
gaugeTotalPartitionNum = metricsManager.addLabeledGauge(PARTITION_NUM_WITH_NODE);
+ gaugeReadLocalDataFileThreadNum =
+ metricsManager.addLabeledGauge(READ_LOCAL_DATA_FILE_THREAD_NUM);
+ gaugeReadLocalIndexFileThreadNum =
+ metricsManager.addLabeledGauge(READ_LOCAL_INDEX_FILE_THREAD_NUM);
+ gaugeReadMemoryDataThreadNum = metricsManager.addLabeledGauge(READ_MEMORY_DATA_THREAD_NUM);
+ gaugeReadLocalDataFileBufferSize =
+ metricsManager.addLabeledGauge(READ_LOCAL_DATA_FILE_BUFFER_SIZE);
+ gaugeReadLocalIndexFileBufferSize =
+ metricsManager.addLabeledGauge(READ_LOCAL_INDEX_FILE_BUFFER_SIZE);
+ gaugeReadMemoryDataBufferSize = metricsManager.addLabeledGauge(READ_MEMORY_DATA_BUFFER_SIZE);
+
gaugeHugePartitionNum = metricsManager.addLabeledGauge(HUGE_PARTITION_NUM);
gaugeAppWithHugePartitionNum = metricsManager.addLabeledGauge(APP_WITH_HUGE_PARTITION_NUM);
diff --git a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
index ed83c0b..dbda25a 100644
--- a/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
+++ b/server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java
@@ -26,6 +26,7 @@
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
+import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -255,7 +256,6 @@
.recordTransportTime(GetMemoryShuffleDataRequest.class.getName(), transportTime);
}
}
- final long start = System.currentTimeMillis();
StatusCode status = StatusCode.SUCCESS;
String msg = "OK";
GetMemoryShuffleDataResponse response;
@@ -266,6 +266,7 @@
if (shuffleServer.getShuffleBufferManager().requireReadMemory(readBufferSize)) {
ShuffleDataResult shuffleDataResult = null;
try {
+ final long start = System.currentTimeMillis();
shuffleDataResult =
shuffleServer
.getShuffleTaskManager()
@@ -283,12 +284,14 @@
bufferSegments = shuffleDataResult.getBufferSegments();
ShuffleServerMetrics.counterTotalReadDataSize.inc(data.size());
ShuffleServerMetrics.counterTotalReadMemoryDataSize.inc(data.size());
+ ShuffleServerMetrics.gaugeReadMemoryDataThreadNum.inc();
+ ShuffleServerMetrics.gaugeReadMemoryDataBufferSize.inc(readBufferSize);
}
response =
new GetMemoryShuffleDataResponse(req.getRequestId(), status, msg, bufferSegments, data);
ReleaseMemoryAndRecordReadTimeListener listener =
new ReleaseMemoryAndRecordReadTimeListener(
- start, readBufferSize, data.size(), requestInfo, req, client);
+ start, readBufferSize, data.size(), requestInfo, req, response, client);
client.getChannel().writeAndFlush(response).addListener(listener);
return;
} catch (Exception e) {
@@ -359,12 +362,14 @@
ManagedBuffer data = shuffleIndexResult.getManagedBuffer();
ShuffleServerMetrics.counterTotalReadDataSize.inc(data.size());
ShuffleServerMetrics.counterTotalReadLocalIndexFileSize.inc(data.size());
+ ShuffleServerMetrics.gaugeReadLocalIndexFileThreadNum.inc();
+ ShuffleServerMetrics.gaugeReadLocalIndexFileBufferSize.inc(assumedFileSize);
response =
new GetLocalShuffleIndexResponse(
req.getRequestId(), status, msg, data, shuffleIndexResult.getDataFileLen());
ReleaseMemoryAndRecordReadTimeListener listener =
new ReleaseMemoryAndRecordReadTimeListener(
- start, assumedFileSize, data.size(), requestInfo, req, client);
+ start, assumedFileSize, data.size(), requestInfo, req, response, client);
client.getChannel().writeAndFlush(response).addListener(listener);
return;
} catch (FileNotFoundException indexFileNotFoundException) {
@@ -465,12 +470,14 @@
length);
ShuffleServerMetrics.counterTotalReadDataSize.inc(sdr.getDataLength());
ShuffleServerMetrics.counterTotalReadLocalDataFileSize.inc(sdr.getDataLength());
+ ShuffleServerMetrics.gaugeReadLocalDataFileThreadNum.inc();
+ ShuffleServerMetrics.gaugeReadLocalDataFileBufferSize.inc(length);
response =
new GetLocalShuffleDataResponse(
req.getRequestId(), status, msg, sdr.getManagedBuffer());
ReleaseMemoryAndRecordReadTimeListener listener =
new ReleaseMemoryAndRecordReadTimeListener(
- start, length, sdr.getDataLength(), requestInfo, req, client);
+ start, length, sdr.getDataLength(), requestInfo, req, response, client);
client.getChannel().writeAndFlush(response).addListener(listener);
return;
} catch (Exception e) {
@@ -531,6 +538,7 @@
private final long dataSize;
private final String requestInfo;
private final RequestMessage request;
+ private final RpcResponse response;
private final TransportClient client;
ReleaseMemoryAndRecordReadTimeListener(
@@ -539,12 +547,14 @@
long dataSize,
String requestInfo,
RequestMessage request,
+ RpcResponse response,
TransportClient client) {
this.readStartedTime = readStartedTime;
this.readBufferSize = readBufferSize;
this.dataSize = dataSize;
this.requestInfo = requestInfo;
this.request = request;
+ this.response = response;
this.client = client;
}
@@ -554,6 +564,20 @@
long readTime = System.currentTimeMillis() - readStartedTime;
ShuffleServerMetrics.counterTotalReadTime.inc(readTime);
shuffleServer.getNettyMetrics().recordProcessTime(request.getClass().getName(), readTime);
+ if (request instanceof GetLocalShuffleDataRequest) {
+ ShuffleServerMetrics.gaugeReadLocalDataFileThreadNum.dec();
+ ShuffleServerMetrics.gaugeReadLocalDataFileBufferSize.dec(readBufferSize);
+ } else if (request instanceof GetLocalShuffleIndexRequest) {
+ ShuffleServerMetrics.gaugeReadLocalIndexFileThreadNum.dec();
+ ShuffleServerMetrics.gaugeReadLocalIndexFileBufferSize.dec(readBufferSize);
+ } else if (request instanceof GetMemoryShuffleDataRequest) {
+ GetMemoryShuffleDataResponse getMemoryShuffleDataResponse =
+ (GetMemoryShuffleDataResponse) response;
+ if (CollectionUtils.isNotEmpty(getMemoryShuffleDataResponse.getBufferSegments())) {
+ ShuffleServerMetrics.gaugeReadMemoryDataThreadNum.dec();
+ ShuffleServerMetrics.gaugeReadMemoryDataBufferSize.dec(readBufferSize);
+ }
+ }
if (!future.isSuccess()) {
Throwable cause = future.cause();
String errorMsg =