blob: d87badafcc16854163c84b6dc4ae46f68dba9a92 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.uniffle.test;
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {
private static final String EXPECTED_EXCEPTION_MESSAGE = "Exception should be thrown";
private static ShuffleServerInfo shuffleServerInfo1;
private static ShuffleServerInfo shuffleServerInfo2;
private ShuffleWriteClientImpl shuffleWriteClientImpl;
@BeforeAll
public static void setupServers(@TempDir File tmpDir) throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
createCoordinatorServer(coordinatorConf);
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 4000);
File dataDir1 = new File(tmpDir, "data1");
File dataDir2 = new File(tmpDir, "data2");
String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
shuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name());
shuffleServerConf.setString("rss.storage.basePath", basePath);
createShuffleServer(shuffleServerConf);
File dataDir3 = new File(tmpDir, "data3");
File dataDir4 = new File(tmpDir, "data4");
basePath = dataDir3.getAbsolutePath() + "," + dataDir4.getAbsolutePath();
shuffleServerConf.setString("rss.storage.basePath", basePath);
shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 1);
shuffleServerConf.setInteger("rss.jetty.http.port", 18081);
createShuffleServer(shuffleServerConf);
startServers();
shuffleServerInfo1 =
new ShuffleServerInfo(
"127.0.0.1-20001", shuffleServers.get(0).getIp(), SHUFFLE_SERVER_PORT);
shuffleServerInfo2 =
new ShuffleServerInfo(
"127.0.0.1-20001", shuffleServers.get(1).getIp(), SHUFFLE_SERVER_PORT + 1);
}
@BeforeEach
public void createClient() {
shuffleWriteClientImpl =
ShuffleClientFactory.newWriteBuilder()
.clientType(ClientType.GRPC.name())
.retryMax(3)
.retryIntervalMax(1000)
.heartBeatThreadNum(1)
.replica(1)
.replicaWrite(1)
.replicaRead(1)
.replicaSkipEnabled(true)
.dataTransferPoolSize(1)
.dataCommitPoolSize(1)
.unregisterThreadPoolSize(10)
.unregisterRequestTimeSec(10)
.build();
}
@AfterEach
public void closeClient() {
shuffleWriteClientImpl.close();
}
@Test
public void rpcFailTest() throws Exception {
String testAppId = "rpcFailTest";
shuffleWriteClientImpl.registerShuffle(
shuffleServerInfo1,
testAppId,
0,
Lists.newArrayList(new PartitionRange(0, 0)),
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL,
-1);
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
// simulator a failed server
ShuffleServerInfo fakeShuffleServerInfo =
new ShuffleServerInfo(
"127.0.0.1-20001", shuffleServers.get(0).getIp(), SHUFFLE_SERVER_PORT + 100);
List<ShuffleBlockInfo> blocks =
createShuffleBlockList(
0,
0,
0,
3,
25,
blockIdBitmap,
expectedData,
Lists.newArrayList(shuffleServerInfo1, fakeShuffleServerInfo));
SendShuffleDataResult result =
shuffleWriteClientImpl.sendShuffleData(testAppId, blocks, () -> false);
Roaring64NavigableMap failedBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap succBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
for (Long blockId : result.getFailedBlockIds()) {
failedBlockIdBitmap.addLong(blockId);
}
for (Long blockId : result.getSuccessBlockIds()) {
succBlockIdBitmap.addLong(blockId);
}
// There will no failed blocks when replica=2
assertEquals(failedBlockIdBitmap.getLongCardinality(), 0);
assertEquals(blockIdBitmap, succBlockIdBitmap);
boolean commitResult =
shuffleWriteClientImpl.sendCommit(
Sets.newHashSet(shuffleServerInfo1, fakeShuffleServerInfo), testAppId, 0, 2);
assertFalse(commitResult);
// Report will success when replica=2
Map<Integer, Set<Long>> ptb = Maps.newHashMap();
ptb.put(0, Sets.newHashSet(blockIdBitmap.stream().iterator()));
Map<ShuffleServerInfo, Map<Integer, Set<Long>>> serverToPartitionToBlockIds = Maps.newHashMap();
serverToPartitionToBlockIds.put(shuffleServerInfo1, ptb);
serverToPartitionToBlockIds.put(fakeShuffleServerInfo, ptb);
shuffleWriteClientImpl.reportShuffleResult(serverToPartitionToBlockIds, testAppId, 0, 0, 2);
Roaring64NavigableMap report =
shuffleWriteClientImpl.getShuffleResult(
"GRPC", Sets.newHashSet(shuffleServerInfo1, fakeShuffleServerInfo), testAppId, 0, 0);
assertEquals(blockIdBitmap, report);
}
@Test
public void reportBlocksToShuffleServerIfNecessary() {
String testAppId = "reportBlocksToShuffleServerIfNecessary_appId";
shuffleWriteClientImpl.registerShuffle(
shuffleServerInfo1,
testAppId,
1,
Lists.newArrayList(new PartitionRange(1, 1)),
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL,
-1);
shuffleWriteClientImpl.registerShuffle(
shuffleServerInfo2,
testAppId,
1,
Lists.newArrayList(new PartitionRange(2, 2)),
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL,
-1);
Map<ShuffleServerInfo, Map<Integer, Set<Long>>> serverToPartitionToBlockIds = Maps.newHashMap();
Map<Integer, Set<Long>> partitionToBlocks = Maps.newHashMap();
Set<Long> blockIds = Sets.newHashSet();
int partitionIdx = 1;
for (int i = 0; i < 5; i++) {
blockIds.add(BlockId.getBlockId(i, partitionIdx, 0));
}
partitionToBlocks.put(partitionIdx, blockIds);
serverToPartitionToBlockIds.put(shuffleServerInfo1, partitionToBlocks);
// case1
shuffleWriteClientImpl.reportShuffleResult(serverToPartitionToBlockIds, testAppId, 1, 0, 1);
Roaring64NavigableMap bitmap =
shuffleWriteClientImpl.getShuffleResult(
"GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId, 1, 0);
assertTrue(bitmap.isEmpty());
bitmap =
shuffleWriteClientImpl.getShuffleResult(
"GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId, 1, partitionIdx);
assertEquals(5, bitmap.getLongCardinality());
for (Long b : partitionToBlocks.get(1)) {
assertTrue(bitmap.contains(b));
}
}
@Test
public void reportMultipleServerTest() throws Exception {
String testAppId = "reportMultipleServerTest";
shuffleWriteClientImpl.registerShuffle(
shuffleServerInfo1,
testAppId,
1,
Lists.newArrayList(new PartitionRange(1, 1)),
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL,
-1);
shuffleWriteClientImpl.registerShuffle(
shuffleServerInfo2,
testAppId,
1,
Lists.newArrayList(new PartitionRange(2, 2)),
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL,
-1);
Map<Integer, List<ShuffleServerInfo>> partitionToServers = Maps.newHashMap();
partitionToServers.putIfAbsent(1, Lists.newArrayList(shuffleServerInfo1));
partitionToServers.putIfAbsent(2, Lists.newArrayList(shuffleServerInfo2));
Map<Integer, Set<Long>> partitionToBlocks1 = Maps.newHashMap();
Set<Long> blockIds = Sets.newHashSet();
for (int i = 0; i < 5; i++) {
blockIds.add(BlockId.getBlockId(i, 1, 0));
}
partitionToBlocks1.put(1, blockIds);
Map<ShuffleServerInfo, Map<Integer, Set<Long>>> serverToPartitionToBlockIds = Maps.newHashMap();
serverToPartitionToBlockIds.put(shuffleServerInfo1, partitionToBlocks1);
Map<Integer, Set<Long>> partitionToBlocks2 = Maps.newHashMap();
blockIds = Sets.newHashSet();
for (int i = 0; i < 7; i++) {
blockIds.add(BlockId.getBlockId(i, 2, 0));
}
partitionToBlocks2.put(2, blockIds);
serverToPartitionToBlockIds.put(shuffleServerInfo2, partitionToBlocks2);
shuffleWriteClientImpl.reportShuffleResult(serverToPartitionToBlockIds, testAppId, 1, 0, 1);
Roaring64NavigableMap bitmap =
shuffleWriteClientImpl.getShuffleResult(
"GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId, 1, 0);
assertTrue(bitmap.isEmpty());
bitmap =
shuffleWriteClientImpl.getShuffleResult(
"GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId, 1, 1);
assertEquals(5, bitmap.getLongCardinality());
for (Long b : partitionToBlocks1.get(1)) {
assertTrue(bitmap.contains(b));
}
bitmap =
shuffleWriteClientImpl.getShuffleResult(
"GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId, 1, 2);
assertTrue(bitmap.isEmpty());
bitmap =
shuffleWriteClientImpl.getShuffleResult(
"GRPC", Sets.newHashSet(shuffleServerInfo2), testAppId, 1, 0);
assertTrue(bitmap.isEmpty());
bitmap =
shuffleWriteClientImpl.getShuffleResult(
"GRPC", Sets.newHashSet(shuffleServerInfo2), testAppId, 1, 1);
assertTrue(bitmap.isEmpty());
bitmap =
shuffleWriteClientImpl.getShuffleResult(
"GRPC", Sets.newHashSet(shuffleServerInfo2), testAppId, 1, 2);
assertEquals(7, bitmap.getLongCardinality());
for (Long b : partitionToBlocks2.get(2)) {
assertTrue(bitmap.contains(b));
}
}
@Test
public void writeReadTest() throws Exception {
String testAppId = "writeReadTest";
shuffleWriteClientImpl.registerShuffle(
shuffleServerInfo1,
testAppId,
0,
Lists.newArrayList(new PartitionRange(0, 0)),
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL,
-1);
shuffleWriteClientImpl.registerShuffle(
shuffleServerInfo2,
testAppId,
0,
Lists.newArrayList(new PartitionRange(0, 0)),
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL,
-1);
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
List<ShuffleBlockInfo> blocks =
createShuffleBlockList(
0,
0,
0,
3,
25,
blockIdBitmap,
expectedData,
Lists.newArrayList(shuffleServerInfo1, shuffleServerInfo2));
shuffleWriteClientImpl.sendShuffleData(testAppId, blocks, () -> false);
// send 1st commit, finish commit won't be sent to Shuffle server and data won't be persisted to
// disk
boolean commitResult =
shuffleWriteClientImpl.sendCommit(
Sets.newHashSet(shuffleServerInfo1, shuffleServerInfo2), testAppId, 0, 2);
assertTrue(commitResult);
ShuffleReadClientImpl readClient =
ShuffleClientFactory.newReadBuilder()
.storageType(StorageType.LOCALFILE.name())
.appId(testAppId)
.shuffleId(0)
.partitionId(0)
.indexReadLimit(100)
.partitionNumPerRange(1)
.partitionNum(10)
.readBufferSize(1000)
.basePath("")
.blockIdBitmap(blockIdBitmap)
.taskIdBitmap(taskIdBitmap)
.shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo1, shuffleServerInfo2))
.build();
assertNull(readClient.readShuffleBlockData());
readClient.close();
// send 2nd commit, data will be persisted to disk
commitResult =
shuffleWriteClientImpl.sendCommit(
Sets.newHashSet(shuffleServerInfo1, shuffleServerInfo2), testAppId, 0, 2);
assertTrue(commitResult);
readClient =
ShuffleClientFactory.newReadBuilder()
.storageType(StorageType.LOCALFILE.name())
.appId(testAppId)
.shuffleId(0)
.partitionId(0)
.indexReadLimit(100)
.partitionNumPerRange(1)
.partitionNum(10)
.readBufferSize(1000)
.basePath("")
.blockIdBitmap(blockIdBitmap)
.taskIdBitmap(taskIdBitmap)
.shuffleServerInfoList(Lists.newArrayList(shuffleServerInfo1, shuffleServerInfo2))
.build();
validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
readClient.close();
// commit will be failed because of fakeIp
commitResult =
shuffleWriteClientImpl.sendCommit(
Sets.newHashSet(
new ShuffleServerInfo("127.0.0.1-20001", "fakeIp", SHUFFLE_SERVER_PORT)),
testAppId,
0,
2);
assertFalse(commitResult);
// wait resource to be deleted
Thread.sleep(6000);
// commit is ok, but finish shuffle rpc will failed because resource was deleted
commitResult =
shuffleWriteClientImpl.sendCommit(
Sets.newHashSet(shuffleServerInfo1, shuffleServerInfo2), testAppId, 0, 2);
assertFalse(commitResult);
}
@Test
public void emptyTaskTest() {
String testAppId = "emptyTaskTest";
shuffleWriteClientImpl.registerShuffle(
shuffleServerInfo1,
testAppId,
0,
Lists.newArrayList(new PartitionRange(0, 0)),
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL,
-1);
boolean commitResult =
shuffleWriteClientImpl.sendCommit(Sets.newHashSet(shuffleServerInfo1), testAppId, 0, 2);
assertTrue(commitResult);
commitResult =
shuffleWriteClientImpl.sendCommit(Sets.newHashSet(shuffleServerInfo2), testAppId, 0, 2);
assertFalse(commitResult);
}
@Test
public void testRetryAssgin() throws Throwable {
int maxTryTime = shuffleServers.size();
AtomicInteger tryTime = new AtomicInteger();
String appId = "app-1";
RemoteStorageInfo remoteStorage = new RemoteStorageInfo("");
ShuffleAssignmentsInfo response = null;
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
int heartbeatInterval = shuffleServerConf.getInteger("rss.server.heartbeat.interval", 1000);
Thread.sleep(heartbeatInterval * 2);
shuffleWriteClientImpl.registerCoordinators(COORDINATOR_QUORUM);
response =
RetryUtils.retry(
() -> {
int currentTryTime = tryTime.incrementAndGet();
ShuffleAssignmentsInfo shuffleAssignments =
shuffleWriteClientImpl.getShuffleAssignments(
appId, 1, 1, 1, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 1, -1);
Map<ShuffleServerInfo, List<PartitionRange>> serverToPartitionRanges =
shuffleAssignments.getServerToPartitionRanges();
serverToPartitionRanges
.entrySet()
.forEach(
entry -> {
if (currentTryTime < maxTryTime) {
shuffleServers.forEach(
(ss) -> {
if (ss.getId().equals(entry.getKey().getId())) {
try {
ss.stopServer();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
shuffleWriteClientImpl.registerShuffle(
entry.getKey(),
appId,
0,
entry.getValue(),
remoteStorage,
ShuffleDataDistributionType.NORMAL,
-1);
});
return shuffleAssignments;
},
heartbeatInterval,
maxTryTime);
assertNotNull(response);
}
}