blob: 5f76be5be6ca6817f8a111a76f0afb92dda1fcc6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.uniffle.test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
import org.apache.uniffle.client.request.RssFinishShuffleRequest;
import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class MultiStorageFaultToleranceTest extends ShuffleReadWriteBase {
private ShuffleServerGrpcClient shuffleServerClient;
private static String REMOTE_STORAGE = HDFS_URI + "rss/multi_storage_fault";
@BeforeAll
public static void setupServers() throws Exception {
final CoordinatorConf coordinatorConf = getCoordinatorConf();
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
String basePath = generateBasePath();
shuffleServerConf.setDouble(ShuffleServerConf.CLEANUP_THRESHOLD, 0.0);
shuffleServerConf.setDouble(ShuffleServerConf.HIGH_WATER_MARK_OF_WRITE, 100.0);
shuffleServerConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 100);
shuffleServerConf.setLong(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC, 30L);
shuffleServerConf.setLong(ShuffleServerConf.SHUFFLE_EXPIRED_TIMEOUT_MS, 5000L);
shuffleServerConf.setLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 60L * 1000L * 60L);
shuffleServerConf.setLong(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 20L * 1000L);
shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE_HDFS.name());
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath));
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 400L * 1024L * 1024L);
createAndStartServers(shuffleServerConf, coordinatorConf);
}
@BeforeEach
public void createClient() {
shuffleServerClient = new ShuffleServerGrpcClient(LOCALHOST, SHUFFLE_SERVER_PORT);
}
@AfterEach
public void closeClient() {
shuffleServerClient.close();
}
@Test
public void hdfsFallbackTest() throws Exception {
String appId = "fallback_test";
Map<Long, byte[]> expectedData = Maps.newHashMap();
Map<Integer, List<Integer>> map = Maps.newHashMap();
map.put(0, Lists.newArrayList(0));
registerShuffle(appId, map);
Roaring64NavigableMap blockBitmap = Roaring64NavigableMap.bitmapOf();
final List<ShuffleBlockInfo> blocks = createShuffleBlockList(
0, 0, 0, 40, 10 * 1024 * 1024, blockBitmap, expectedData);
assertEquals(1, cluster.getDataNodes().size());
cluster.stopDataNode(0);
assertEquals(0, cluster.getDataNodes().size());
sendSinglePartitionToShuffleServer(appId, 0, 0, 0, blocks);
validateResult(appId, 0, 0, blockBitmap, Roaring64NavigableMap.bitmapOf(0), expectedData);
}
private void registerShuffle(String appId, Map<Integer, List<Integer>> registerMap) {
for (Map.Entry<Integer, List<Integer>> entry : registerMap.entrySet()) {
for (int partition : entry.getValue()) {
RssRegisterShuffleRequest rr = new RssRegisterShuffleRequest(appId, entry.getKey(),
Lists.newArrayList(new PartitionRange(partition, partition)), REMOTE_STORAGE);
shuffleServerClient.registerShuffle(rr);
}
}
}
private void sendSinglePartitionToShuffleServer(String appId, int shuffle, int partition,
long taskAttemptId, List<ShuffleBlockInfo> blocks) {
Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks = Maps.newHashMap();
partitionToBlocks.put(partition, blocks);
shuffleToBlocks.put(shuffle, partitionToBlocks);
RssSendShuffleDataRequest rs = new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
shuffleServerClient.sendShuffleData(rs);
RssSendCommitRequest rc = new RssSendCommitRequest(appId, shuffle);
shuffleServerClient.sendCommit(rc);
RssFinishShuffleRequest rf = new RssFinishShuffleRequest(appId, shuffle);
shuffleServerClient.finishShuffle(rf);
Map<Integer, List<Long>> partitionToBlockIds = Maps.newHashMap();
Set<Long> expectBlockIds = getExpectBlockIds(blocks);
partitionToBlockIds.put(shuffle, new ArrayList<>(expectBlockIds));
RssReportShuffleResultRequest rrp = new RssReportShuffleResultRequest(
appId, shuffle, taskAttemptId, partitionToBlockIds, 1);
shuffleServerClient.reportShuffleResult(rrp);
}
protected void validateResult(String appId, int shuffleId, int partitionId, Roaring64NavigableMap blockBitmap,
Roaring64NavigableMap taskBitmap, Map<Long, byte[]> expectedData) {
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.LOCALFILE_HDFS.name(),
appId, shuffleId, partitionId, 100, 1, 10, 1000, REMOTE_STORAGE, blockBitmap, taskBitmap,
Lists.newArrayList(new ShuffleServerInfo("test", LOCALHOST, SHUFFLE_SERVER_PORT)), conf, new DefaultIdHelper());
CompressedShuffleBlock csb = readClient.readShuffleBlockData();
Roaring64NavigableMap matched = Roaring64NavigableMap.bitmapOf();
while (csb != null && csb.getByteBuffer() != null) {
for (Map.Entry<Long, byte[]> entry : expectedData.entrySet()) {
if (compareByte(entry.getValue(), csb.getByteBuffer())) {
matched.addLong(entry.getKey());
break;
}
}
csb = readClient.readShuffleBlockData();
}
assertTrue(blockBitmap.equals(matched));
}
private Set<Long> getExpectBlockIds(List<ShuffleBlockInfo> blocks) {
List<Long> expectBlockIds = Lists.newArrayList();
blocks.forEach(b -> expectBlockIds.add(b.getBlockId()));
return Sets.newHashSet(expectBlockIds);
}
}