| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.uniffle.test; |
| |
| import java.io.File; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| import java.util.stream.Stream; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| import com.google.common.util.concurrent.Uninterruptibles; |
| import org.junit.jupiter.api.AfterEach; |
| import org.junit.jupiter.api.BeforeAll; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.io.TempDir; |
| import org.junit.jupiter.params.ParameterizedTest; |
| import org.junit.jupiter.params.provider.Arguments; |
| import org.junit.jupiter.params.provider.MethodSource; |
| import org.roaringbitmap.longlong.Roaring64NavigableMap; |
| |
| import org.apache.uniffle.client.factory.ShuffleClientFactory; |
| import org.apache.uniffle.client.impl.ShuffleReadClientImpl; |
| import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient; |
| import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcNettyClient; |
| import org.apache.uniffle.client.request.RssFinishShuffleRequest; |
| import org.apache.uniffle.client.request.RssRegisterShuffleRequest; |
| import org.apache.uniffle.client.request.RssSendCommitRequest; |
| import org.apache.uniffle.client.request.RssSendShuffleDataRequest; |
| import org.apache.uniffle.client.response.RssSendShuffleDataResponse; |
| import org.apache.uniffle.common.ClientType; |
| import org.apache.uniffle.common.PartitionRange; |
| import org.apache.uniffle.common.ShuffleBlockInfo; |
| import org.apache.uniffle.common.ShuffleServerInfo; |
| import org.apache.uniffle.common.rpc.ServerType; |
| import org.apache.uniffle.common.rpc.StatusCode; |
| import org.apache.uniffle.coordinator.CoordinatorConf; |
| import org.apache.uniffle.coordinator.CoordinatorServer; |
| import org.apache.uniffle.server.ShuffleServer; |
| import org.apache.uniffle.server.ShuffleServerConf; |
| import org.apache.uniffle.storage.util.StorageType; |
| |
| import static org.junit.jupiter.api.Assertions.assertSame; |
| import static org.junit.jupiter.api.Assertions.assertTrue; |
| |
| public class DiskErrorToleranceTest extends ShuffleReadWriteBase { |
| private ShuffleServerGrpcClient grpcShuffleServerClient; |
| private ShuffleServerGrpcNettyClient nettyShuffleServerClient; |
| private static ShuffleServerConf grpcShuffleServerConfig; |
| private static ShuffleServerConf nettyShuffleServerConfig; |
| |
| private static File data1; |
| private static File data2; |
| private List<ShuffleServerInfo> grpcShuffleServerInfoList; |
| private List<ShuffleServerInfo> nettyShuffleServerInfoList; |
| |
| @BeforeAll |
| public static void setupServers(@TempDir File serverTmpDir) throws Exception { |
| data1 = new File(serverTmpDir, "data1"); |
| data2 = new File(serverTmpDir, "data2"); |
| CoordinatorConf coordinatorConf = getCoordinatorConf(); |
| createCoordinatorServer(coordinatorConf); |
| |
| ShuffleServerConf grpcShuffleServerConf = buildShuffleServerConf(ServerType.GRPC); |
| createShuffleServer(grpcShuffleServerConf); |
| |
| ShuffleServerConf nettyShuffleServerConf = buildShuffleServerConf(ServerType.GRPC_NETTY); |
| createShuffleServer(nettyShuffleServerConf); |
| |
| startServers(); |
| |
| grpcShuffleServerConfig = grpcShuffleServerConf; |
| nettyShuffleServerConfig = nettyShuffleServerConf; |
| } |
| |
| @BeforeEach |
| public void createClient(@TempDir File serverTmpDir) throws Exception { |
| grpcShuffleServerClient = |
| new ShuffleServerGrpcClient( |
| LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); |
| nettyShuffleServerClient = |
| new ShuffleServerGrpcNettyClient( |
| LOCALHOST, |
| nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), |
| nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)); |
| |
| grpcShuffleServerInfoList = |
| Lists.newArrayList( |
| new ShuffleServerInfo( |
| String.format( |
| "127.0.0.1-%s", |
| grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)), |
| LOCALHOST, |
| grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT))); |
| nettyShuffleServerInfoList = |
| Lists.newArrayList( |
| new ShuffleServerInfo( |
| String.format( |
| "127.0.0.1-%s", |
| grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)), |
| LOCALHOST, |
| grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT))); |
| } |
| |
| private static ShuffleServerConf buildShuffleServerConf(ServerType serverType) throws Exception { |
| ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType); |
| shuffleServerConf.setString( |
| ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name()); |
| shuffleServerConf.set( |
| ShuffleServerConf.RSS_STORAGE_BASE_PATH, |
| Arrays.asList(data1.getAbsolutePath(), data2.getAbsolutePath())); |
| shuffleServerConf.setBoolean(ShuffleServerConf.HEALTH_CHECK_ENABLE, true); |
| return shuffleServerConf; |
| } |
| |
| @AfterEach |
| public void closeClient() throws Exception { |
| grpcShuffleServerClient.close(); |
| nettyShuffleServerClient.close(); |
| cleanCluster(); |
| } |
| |
| public static void cleanCluster() throws Exception { |
| for (CoordinatorServer coordinator : coordinators) { |
| coordinator.stopServer(); |
| } |
| for (ShuffleServer shuffleServer : grpcShuffleServers) { |
| shuffleServer.stopServer(); |
| } |
| for (ShuffleServer shuffleServer : nettyShuffleServers) { |
| shuffleServer.stopServer(); |
| } |
| grpcShuffleServers = Lists.newArrayList(); |
| nettyShuffleServers = Lists.newArrayList(); |
| coordinators = Lists.newArrayList(); |
| } |
| |
| private static Stream<Arguments> diskErrorTestProvider() { |
| return Stream.of(Arguments.of(true), Arguments.of(false)); |
| } |
| |
| @ParameterizedTest |
| @MethodSource("diskErrorTestProvider") |
| private void diskErrorTest(boolean isNettyMode) throws Exception { |
| ShuffleServerGrpcClient shuffleServerClient = |
| isNettyMode ? nettyShuffleServerClient : grpcShuffleServerClient; |
| String appId = "ap_disk_error_data"; |
| Map<Long, byte[]> expectedData = Maps.newHashMap(); |
| Set<Long> expectedBlock1 = Sets.newHashSet(); |
| Roaring64NavigableMap blockIdBitmap1 = Roaring64NavigableMap.bitmapOf(); |
| List<ShuffleBlockInfo> blocks1 = |
| createShuffleBlockList(0, 0, 1, 3, 25, blockIdBitmap1, expectedData); |
| RssRegisterShuffleRequest rr1 = |
| new RssRegisterShuffleRequest(appId, 0, Lists.newArrayList(new PartitionRange(0, 0)), ""); |
| shuffleServerClient.registerShuffle(rr1); |
| blocks1.forEach(b -> expectedBlock1.add(b.getBlockId())); |
| Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap(); |
| partitionToBlocks.put(0, blocks1); |
| Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks = Maps.newHashMap(); |
| shuffleToBlocks.put(0, partitionToBlocks); |
| RssSendShuffleDataRequest rs1 = new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks); |
| RssSendCommitRequest rc1 = new RssSendCommitRequest(appId, 0); |
| RssFinishShuffleRequest rf1 = new RssFinishShuffleRequest(appId, 0); |
| RssSendShuffleDataResponse response = shuffleServerClient.sendShuffleData(rs1); |
| assertSame(StatusCode.SUCCESS, response.getStatusCode()); |
| shuffleServerClient.sendCommit(rc1); |
| shuffleServerClient.finishShuffle(rf1); |
| List<ShuffleServerInfo> shuffleServerInfoList = |
| isNettyMode ? nettyShuffleServerInfoList : grpcShuffleServerInfoList; |
| ShuffleReadClientImpl readClient = |
| ShuffleClientFactory.newReadBuilder() |
| .clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC) |
| .storageType(StorageType.LOCALFILE.name()) |
| .appId(appId) |
| .shuffleId(0) |
| .partitionId(0) |
| .indexReadLimit(100) |
| .partitionNumPerRange(1) |
| .partitionNum(10) |
| .readBufferSize(1000) |
| .basePath(null) |
| .blockIdBitmap(blockIdBitmap1) |
| .taskIdBitmap(Roaring64NavigableMap.bitmapOf(1)) |
| .shuffleServerInfoList(shuffleServerInfoList) |
| .hadoopConf(conf) |
| .build(); |
| validateResult(readClient, expectedData); |
| |
| File shuffleData = new File(data2, appId); |
| assertTrue(shuffleData.exists()); |
| Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS); |
| |
| expectedData.clear(); |
| partitionToBlocks.clear(); |
| shuffleToBlocks.clear(); |
| Roaring64NavigableMap blockIdBitmap2 = Roaring64NavigableMap.bitmapOf(); |
| Set<Long> expectedBlock2 = Sets.newHashSet(); |
| List<ShuffleBlockInfo> blocks2 = |
| createShuffleBlockList(0, 0, 2, 5, 30, blockIdBitmap2, expectedData); |
| blocks2.forEach(b -> expectedBlock2.add(b.getBlockId())); |
| partitionToBlocks.put(0, blocks2); |
| shuffleToBlocks.put(0, partitionToBlocks); |
| rs1 = new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks); |
| response = shuffleServerClient.sendShuffleData(rs1); |
| assertSame(StatusCode.SUCCESS, response.getStatusCode()); |
| |
| shuffleServerClient.sendCommit(rc1); |
| shuffleServerClient.finishShuffle(rf1); |
| |
| readClient = |
| ShuffleClientFactory.newReadBuilder() |
| .storageType(StorageType.LOCALFILE.name()) |
| .appId(appId) |
| .shuffleId(0) |
| .partitionId(0) |
| .indexReadLimit(100) |
| .partitionNumPerRange(1) |
| .partitionNum(10) |
| .readBufferSize(1000) |
| .basePath(null) |
| .blockIdBitmap(blockIdBitmap2) |
| .taskIdBitmap(Roaring64NavigableMap.bitmapOf(2)) |
| .shuffleServerInfoList(shuffleServerInfoList) |
| .hadoopConf(conf) |
| .build(); |
| validateResult(readClient, expectedData); |
| shuffleData = new File(data1, appId); |
| assertTrue(shuffleData.exists()); |
| } |
| } |