[#1476] feat(spark): Provide dedicated unregister app rpc interface (#1510)
### What changes were proposed in this pull request?
Introduce dedicated unregisterApp rpc interface which is only called once when unregister shuffle
### Why are the changes needed?
Fix: [#1476](https://github.com/apache/incubator-uniffle/issues/1476)
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT
diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index faa1f24..337869d 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -60,6 +60,7 @@
import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.client.request.RssUnregisterShuffleByAppIdRequest;
import org.apache.uniffle.client.request.RssUnregisterShuffleRequest;
import org.apache.uniffle.client.response.ClientResponse;
import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
@@ -73,6 +74,7 @@
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssSendCommitResponse;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
+import org.apache.uniffle.client.response.RssUnregisterShuffleByAppIdResponse;
import org.apache.uniffle.client.response.RssUnregisterShuffleResponse;
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.client.util.ClientUtils;
@@ -984,6 +986,8 @@
@Override
public void unregisterShuffle(String appId) {
+ RssUnregisterShuffleByAppIdRequest request = new RssUnregisterShuffleByAppIdRequest(appId);
+
if (appId == null) {
return;
}
@@ -991,7 +995,41 @@
if (appServerMap == null) {
return;
}
- appServerMap.keySet().forEach(shuffleId -> unregisterShuffle(appId, shuffleId));
+ Set<ShuffleServerInfo> shuffleServerInfos = getAllShuffleServers(appId);
+
+ ExecutorService executorService = null;
+ try {
+ executorService =
+ ThreadUtils.getDaemonFixedThreadPool(
+ Math.min(unregisterThreadPoolSize, shuffleServerInfos.size()), "unregister-shuffle");
+
+ ThreadUtils.executeTasks(
+ executorService,
+ shuffleServerInfos,
+ shuffleServerInfo -> {
+ try {
+ ShuffleServerClient client =
+ ShuffleServerClientFactory.getInstance()
+ .getShuffleServerClient(clientType, shuffleServerInfo, rssConf);
+ RssUnregisterShuffleByAppIdResponse response =
+ client.unregisterShuffleByAppId(request);
+ if (response.getStatusCode() != StatusCode.SUCCESS) {
+ LOG.warn("Failed to unregister shuffle to " + shuffleServerInfo);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error happened when unregistering to " + shuffleServerInfo, e);
+ }
+ return null;
+ },
+ unregisterRequestTimeSec,
+ "unregister shuffle server");
+
+ } finally {
+ if (executorService != null) {
+ executorService.shutdownNow();
+ }
+ shuffleServerInfoMap.remove(appId);
+ }
}
private void throwExceptionIfNecessary(ClientResponse response, String errorMsg) {
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
index eb4cd39..8be3d67 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/api/ShuffleServerClient.java
@@ -28,6 +28,7 @@
import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.client.request.RssUnregisterShuffleByAppIdRequest;
import org.apache.uniffle.client.request.RssUnregisterShuffleRequest;
import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
import org.apache.uniffle.client.response.RssFinishShuffleResponse;
@@ -39,6 +40,7 @@
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssSendCommitResponse;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
+import org.apache.uniffle.client.response.RssUnregisterShuffleByAppIdResponse;
import org.apache.uniffle.client.response.RssUnregisterShuffleResponse;
public interface ShuffleServerClient {
@@ -47,6 +49,9 @@
RssRegisterShuffleResponse registerShuffle(RssRegisterShuffleRequest request);
+ RssUnregisterShuffleByAppIdResponse unregisterShuffleByAppId(
+ RssUnregisterShuffleByAppIdRequest request);
+
RssSendShuffleDataResponse sendShuffleData(RssSendShuffleDataRequest request);
RssSendCommitResponse sendCommit(RssSendCommitRequest request);
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 6bae6f6..7297aec 100644
--- a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -45,6 +45,7 @@
import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.client.request.RssUnregisterShuffleByAppIdRequest;
import org.apache.uniffle.client.request.RssUnregisterShuffleRequest;
import org.apache.uniffle.client.response.RssAppHeartBeatResponse;
import org.apache.uniffle.client.response.RssFinishShuffleResponse;
@@ -56,6 +57,7 @@
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.response.RssSendCommitResponse;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
+import org.apache.uniffle.client.response.RssUnregisterShuffleByAppIdResponse;
import org.apache.uniffle.client.response.RssUnregisterShuffleResponse;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.PartitionRange;
@@ -328,6 +330,37 @@
return result;
}
+ private RssProtos.ShuffleUnregisterByAppIdResponse doUnregisterShuffleByAppId(String appId) {
+ RssProtos.ShuffleUnregisterByAppIdRequest request =
+ RssProtos.ShuffleUnregisterByAppIdRequest.newBuilder().setAppId(appId).build();
+ return blockingStub.unregisterShuffleByAppId(request);
+ }
+
+ @Override
+ public RssUnregisterShuffleByAppIdResponse unregisterShuffleByAppId(
+ RssUnregisterShuffleByAppIdRequest request) {
+ RssProtos.ShuffleUnregisterByAppIdResponse rpcResponse =
+ doUnregisterShuffleByAppId(request.getAppId());
+
+ RssUnregisterShuffleByAppIdResponse response;
+ RssProtos.StatusCode statusCode = rpcResponse.getStatus();
+
+ switch (statusCode) {
+ case SUCCESS:
+ response = new RssUnregisterShuffleByAppIdResponse(StatusCode.SUCCESS);
+ break;
+ default:
+ String msg =
+ String.format(
+ "Errors on unregister app to %s:%s for appId[%s], error: %s",
+ host, port, request.getAppId(), rpcResponse.getRetMsg());
+ LOG.error(msg);
+ throw new RssException(msg);
+ }
+
+ return response;
+ }
+
private RssProtos.ShuffleUnregisterResponse doUnregisterShuffle(String appId, int shuffleId) {
RssProtos.ShuffleUnregisterRequest request =
RssProtos.ShuffleUnregisterRequest.newBuilder()
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterShuffleByAppIdRequest.java b/internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterShuffleByAppIdRequest.java
new file mode 100644
index 0000000..0992355
--- /dev/null
+++ b/internal-client/src/main/java/org/apache/uniffle/client/request/RssUnregisterShuffleByAppIdRequest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.request;
+
+public class RssUnregisterShuffleByAppIdRequest {
+ private String appId;
+
+ public RssUnregisterShuffleByAppIdRequest(String appId) {
+ this.appId = appId;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+}
diff --git a/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterShuffleByAppIdResponse.java b/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterShuffleByAppIdResponse.java
new file mode 100644
index 0000000..5c01e84
--- /dev/null
+++ b/internal-client/src/main/java/org/apache/uniffle/client/response/RssUnregisterShuffleByAppIdResponse.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.client.response;
+
+import org.apache.uniffle.common.rpc.StatusCode;
+
+public class RssUnregisterShuffleByAppIdResponse extends ClientResponse {
+
+ public RssUnregisterShuffleByAppIdResponse(StatusCode statusCode) {
+ super(statusCode);
+ }
+}
diff --git a/proto/src/main/proto/Rss.proto b/proto/src/main/proto/Rss.proto
index 9f601f2..aab38ef 100644
--- a/proto/src/main/proto/Rss.proto
+++ b/proto/src/main/proto/Rss.proto
@@ -26,6 +26,7 @@
service ShuffleServer {
rpc registerShuffle (ShuffleRegisterRequest) returns (ShuffleRegisterResponse);
rpc unregisterShuffle(ShuffleUnregisterRequest) returns (ShuffleUnregisterResponse);
+ rpc unregisterShuffleByAppId(ShuffleUnregisterByAppIdRequest) returns (ShuffleUnregisterByAppIdResponse);
rpc sendShuffleData (SendShuffleDataRequest) returns (SendShuffleDataResponse);
rpc getLocalShuffleIndex (GetLocalShuffleIndexRequest) returns (GetLocalShuffleIndexResponse);
rpc getLocalShuffleData (GetLocalShuffleDataRequest) returns (GetLocalShuffleDataResponse);
@@ -197,6 +198,15 @@
string retMsg = 2;
}
+message ShuffleUnregisterByAppIdRequest {
+ string appId = 1;
+}
+
+message ShuffleUnregisterByAppIdResponse {
+ StatusCode status = 1;
+ string retMsg = 2;
+}
+
message SendShuffleDataRequest {
string appId = 1;
int32 shuffleId = 2;
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 65c2b99..ac9b95c 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -100,6 +100,30 @@
}
@Override
+ public void unregisterShuffleByAppId(
+ RssProtos.ShuffleUnregisterByAppIdRequest request,
+ StreamObserver<RssProtos.ShuffleUnregisterByAppIdResponse> responseStreamObserver) {
+ String appId = request.getAppId();
+
+ StatusCode result = StatusCode.SUCCESS;
+ String responseMessage = "OK";
+ try {
+ shuffleServer.getShuffleTaskManager().removeShuffleDataAsync(appId);
+
+ } catch (Exception e) {
+ result = StatusCode.INTERNAL_ERROR;
+ }
+
+ RssProtos.ShuffleUnregisterByAppIdResponse reply =
+ RssProtos.ShuffleUnregisterByAppIdResponse.newBuilder()
+ .setStatus(result.toProto())
+ .setRetMsg(responseMessage)
+ .build();
+ responseStreamObserver.onNext(reply);
+ responseStreamObserver.onCompleted();
+ }
+
+ @Override
public void unregisterShuffle(
RssProtos.ShuffleUnregisterRequest request,
StreamObserver<RssProtos.ShuffleUnregisterResponse> responseStreamObserver) {
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 04d62da..7977b80 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -70,6 +70,7 @@
import org.apache.uniffle.server.buffer.ShuffleBuffer;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.server.event.AppPurgeEvent;
+import org.apache.uniffle.server.event.AppUnregisterPurgeEvent;
import org.apache.uniffle.server.event.PurgeEvent;
import org.apache.uniffle.server.event.ShufflePurgeEvent;
import org.apache.uniffle.server.storage.StorageManager;
@@ -183,6 +184,12 @@
(System.currentTimeMillis() - startTime) / Constants.MILLION_SECONDS_PER_SECOND;
ShuffleServerMetrics.summaryTotalRemoveResourceTime.observe(usedTime);
}
+ if (event instanceof AppUnregisterPurgeEvent) {
+ removeResources(event.getAppId(), false);
+ double usedTime =
+ (System.currentTimeMillis() - startTime) / Constants.MILLION_SECONDS_PER_SECOND;
+ ShuffleServerMetrics.summaryTotalRemoveResourceTime.observe(usedTime);
+ }
if (event instanceof ShufflePurgeEvent) {
removeResourcesByShuffleIds(event.getAppId(), event.getShuffleIds());
double usedTime =
@@ -842,6 +849,10 @@
new ShufflePurgeEvent(appId, getUserByAppId(appId), Arrays.asList(shuffleId)));
}
+ public void removeShuffleDataAsync(String appId) {
+ expiredAppIdQueue.add(new AppUnregisterPurgeEvent(appId, getUserByAppId(appId)));
+ }
+
@VisibleForTesting
void removeShuffleDataSync(String appId, int shuffleId) {
removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId));
diff --git a/server/src/main/java/org/apache/uniffle/server/event/AppUnregisterPurgeEvent.java b/server/src/main/java/org/apache/uniffle/server/event/AppUnregisterPurgeEvent.java
new file mode 100644
index 0000000..04d6318
--- /dev/null
+++ b/server/src/main/java/org/apache/uniffle/server/event/AppUnregisterPurgeEvent.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.event;
+
+public class AppUnregisterPurgeEvent extends PurgeEvent {
+ public AppUnregisterPurgeEvent(String appId, String user) {
+ super(appId, user, null);
+ }
+}