| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.uniffle.test; |
| |
| import java.io.File; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.stream.Stream; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import org.junit.jupiter.api.AfterAll; |
| import org.junit.jupiter.api.BeforeAll; |
| import org.junit.jupiter.api.io.TempDir; |
| import org.junit.jupiter.params.ParameterizedTest; |
| import org.junit.jupiter.params.provider.Arguments; |
| import org.junit.jupiter.params.provider.MethodSource; |
| import org.roaringbitmap.longlong.Roaring64NavigableMap; |
| |
| import org.apache.uniffle.client.factory.ShuffleClientFactory; |
| import org.apache.uniffle.client.impl.ShuffleReadClientImpl; |
| import org.apache.uniffle.client.impl.ShuffleWriteClientImpl; |
| import org.apache.uniffle.client.response.SendShuffleDataResult; |
| import org.apache.uniffle.common.ClientType; |
| import org.apache.uniffle.common.PartitionRange; |
| import org.apache.uniffle.common.RemoteStorageInfo; |
| import org.apache.uniffle.common.ShuffleBlockInfo; |
| import org.apache.uniffle.common.ShuffleDataDistributionType; |
| import org.apache.uniffle.common.ShuffleServerInfo; |
| import org.apache.uniffle.common.rpc.ServerType; |
| import org.apache.uniffle.coordinator.CoordinatorConf; |
| import org.apache.uniffle.coordinator.CoordinatorServer; |
| import org.apache.uniffle.server.MockedGrpcServer; |
| import org.apache.uniffle.server.MockedShuffleServer; |
| import org.apache.uniffle.server.ShuffleServer; |
| import org.apache.uniffle.server.ShuffleServerConf; |
| import org.apache.uniffle.storage.util.StorageType; |
| |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.fail; |
| |
| public class RpcClientRetryTest extends ShuffleReadWriteBase { |
| |
| private static ShuffleServerInfo shuffleServerInfo0; |
| private static ShuffleServerInfo shuffleServerInfo1; |
| private static ShuffleServerInfo shuffleServerInfo2; |
| private static MockedShuffleWriteClientImpl shuffleWriteClientImpl; |
| |
| private ShuffleClientFactory.ReadClientBuilder baseReadBuilder(StorageType storageType) { |
| return ShuffleClientFactory.newReadBuilder() |
| .storageType(storageType.name()) |
| .shuffleId(0) |
| .partitionId(0) |
| .indexReadLimit(100) |
| .partitionNumPerRange(1) |
| .partitionNum(10) |
| .readBufferSize(1000); |
| } |
| |
| public static MockedShuffleServer createMockedShuffleServer(int id, File tmpDir) |
| throws Exception { |
| ShuffleServerConf shuffleServerConf = getShuffleServerConf(ServerType.GRPC); |
| File dataDir1 = new File(tmpDir, id + "_1"); |
| File dataDir2 = new File(tmpDir, id + "_2"); |
| String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath(); |
| shuffleServerConf.setString("rss.storage.type", StorageType.MEMORY_LOCALFILE.name()); |
| shuffleServerConf.setString("rss.storage.basePath", basePath); |
| shuffleServerConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 5.0); |
| shuffleServerConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 15.0); |
| shuffleServerConf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 600L); |
| return new MockedShuffleServer(shuffleServerConf); |
| } |
| |
| @BeforeAll |
| public static void initCluster(@TempDir File tmpDir) throws Exception { |
| CoordinatorConf coordinatorConf = getCoordinatorConf(); |
| createCoordinatorServer(coordinatorConf); |
| |
| grpcShuffleServers.add(createMockedShuffleServer(0, tmpDir)); |
| grpcShuffleServers.add(createMockedShuffleServer(1, tmpDir)); |
| grpcShuffleServers.add(createMockedShuffleServer(2, tmpDir)); |
| |
| shuffleServerInfo0 = |
| new ShuffleServerInfo( |
| String.format("127.0.0.1-%s", grpcShuffleServers.get(0).getGrpcPort()), |
| grpcShuffleServers.get(0).getIp(), |
| grpcShuffleServers.get(0).getGrpcPort()); |
| shuffleServerInfo1 = |
| new ShuffleServerInfo( |
| String.format("127.0.0.1-%s", grpcShuffleServers.get(1).getGrpcPort()), |
| grpcShuffleServers.get(1).getIp(), |
| grpcShuffleServers.get(1).getGrpcPort()); |
| shuffleServerInfo2 = |
| new ShuffleServerInfo( |
| String.format("127.0.0.1-%s", grpcShuffleServers.get(2).getGrpcPort()), |
| grpcShuffleServers.get(2).getIp(), |
| grpcShuffleServers.get(2).getGrpcPort()); |
| for (CoordinatorServer coordinator : coordinators) { |
| coordinator.start(); |
| } |
| for (ShuffleServer shuffleServer : grpcShuffleServers) { |
| shuffleServer.start(); |
| } |
| } |
| |
| public static void cleanCluster() throws Exception { |
| for (CoordinatorServer coordinator : coordinators) { |
| coordinator.stopServer(); |
| } |
| for (ShuffleServer shuffleServer : grpcShuffleServers) { |
| shuffleServer.stopServer(); |
| } |
| grpcShuffleServers = Lists.newArrayList(); |
| coordinators = Lists.newArrayList(); |
| } |
| |
| @AfterAll |
| public static void cleanEnv() throws Exception { |
| if (shuffleWriteClientImpl != null) { |
| shuffleWriteClientImpl.close(); |
| } |
| cleanCluster(); |
| } |
| |
| private static Stream<Arguments> testRpcRetryLogicProvider() { |
| return Stream.of( |
| Arguments.of(StorageType.MEMORY_LOCALFILE), |
| // According to SERVER_BUFFER_CAPACITY & SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, |
| // data will be flushed to disk, so read from disk only |
| Arguments.of(StorageType.LOCALFILE)); |
| } |
| |
| @ParameterizedTest |
| @MethodSource("testRpcRetryLogicProvider") |
| public void testRpcRetryLogic(StorageType storageType) { |
| String testAppId = "testRpcRetryLogic"; |
| registerShuffleServer(testAppId, 3, 2, 2, true); |
| Map<Long, byte[]> expectedData = Maps.newHashMap(); |
| Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf(); |
| |
| List<ShuffleBlockInfo> blocks = |
| createShuffleBlockList( |
| 0, |
| 0, |
| 0, |
| 3, |
| 25, |
| blockIdBitmap, |
| expectedData, |
| Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2)); |
| |
| SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks); |
| Roaring64NavigableMap failedBlockIdBitmap = Roaring64NavigableMap.bitmapOf(); |
| Roaring64NavigableMap successfulBlockIdBitmap = Roaring64NavigableMap.bitmapOf(); |
| for (Long blockId : result.getSuccessBlockIds()) { |
| successfulBlockIdBitmap.addLong(blockId); |
| } |
| for (Long blockId : result.getFailedBlockIds()) { |
| failedBlockIdBitmap.addLong(blockId); |
| } |
| assertEquals(0, failedBlockIdBitmap.getLongCardinality()); |
| assertEquals(blockIdBitmap, successfulBlockIdBitmap); |
| |
| Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0); |
| |
| ShuffleReadClientImpl readClient1 = |
| baseReadBuilder(storageType) |
| .appId(testAppId) |
| .blockIdBitmap(blockIdBitmap) |
| .taskIdBitmap(taskIdBitmap) |
| .shuffleServerInfoList( |
| Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2)) |
| .retryMax(3) |
| .retryIntervalMax(1) |
| .build(); |
| |
| // The data cannot be read because the maximum number of retries is 3 |
| enableFirstNReadRequestsToFail(4); |
| try { |
| validateResult(readClient1, expectedData); |
| fail(); |
| } catch (Exception e) { |
| // do nothing |
| } |
| disableFirstNReadRequestsToFail(); |
| |
| ShuffleReadClientImpl readClient2 = |
| baseReadBuilder(storageType) |
| .appId(testAppId) |
| .blockIdBitmap(blockIdBitmap) |
| .taskIdBitmap(taskIdBitmap) |
| .shuffleServerInfoList( |
| Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2)) |
| .retryMax(3) |
| .retryIntervalMax(1) |
| .build(); |
| |
| // The data can be read because the reader will retry |
| enableFirstNReadRequestsToFail(1); |
| validateResult(readClient2, expectedData); |
| disableFirstNReadRequestsToFail(); |
| } |
| |
| private static void enableFirstNReadRequestsToFail(int failedCount) { |
| for (ShuffleServer server : grpcShuffleServers) { |
| ((MockedGrpcServer) server.getServer()) |
| .getService() |
| .enableFirstNReadRequestToFail(failedCount); |
| } |
| } |
| |
| private static void disableFirstNReadRequestsToFail() { |
| for (ShuffleServer server : grpcShuffleServers) { |
| ((MockedGrpcServer) server.getServer()).getService().resetFirstNReadRequestToFail(); |
| } |
| } |
| |
| static class MockedShuffleWriteClientImpl extends ShuffleWriteClientImpl { |
| MockedShuffleWriteClientImpl(ShuffleClientFactory.WriteClientBuilder builder) { |
| super(builder); |
| } |
| |
| public SendShuffleDataResult sendShuffleData( |
| String appId, List<ShuffleBlockInfo> shuffleBlockInfoList) { |
| return super.sendShuffleData(appId, shuffleBlockInfoList, () -> false); |
| } |
| } |
| |
| private void registerShuffleServer( |
| String testAppId, int replica, int replicaWrite, int replicaRead, boolean replicaSkip) { |
| |
| shuffleWriteClientImpl = |
| new MockedShuffleWriteClientImpl( |
| ShuffleClientFactory.newWriteBuilder() |
| .clientType(ClientType.GRPC.name()) |
| .retryMax(3) |
| .retryIntervalMax(1000) |
| .heartBeatThreadNum(1) |
| .replica(replica) |
| .replicaWrite(replicaWrite) |
| .replicaRead(replicaRead) |
| .replicaSkipEnabled(replicaSkip) |
| .dataTransferPoolSize(1) |
| .dataCommitPoolSize(1) |
| .unregisterThreadPoolSize(10) |
| .unregisterRequestTimeSec(10)); |
| |
| List<ShuffleServerInfo> allServers = |
| Lists.newArrayList(shuffleServerInfo0, shuffleServerInfo1, shuffleServerInfo2); |
| |
| for (int i = 0; i < replica; i++) { |
| shuffleWriteClientImpl.registerShuffle( |
| allServers.get(i), |
| testAppId, |
| 0, |
| Lists.newArrayList(new PartitionRange(0, 0)), |
| new RemoteStorageInfo(""), |
| ShuffleDataDistributionType.NORMAL, |
| 1); |
| } |
| } |
| } |