blob: 566d99e9d78430dedab71847207e709dc4d20cfe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.uniffle.test;
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.impl.ShuffleReadClientImpl;
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RetryUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {
private static final String EXPECTED_EXCEPTION_MESSAGE = "Exception should be thrown";
private static ShuffleServerInfo shuffleServerInfo1;
private static ShuffleServerInfo shuffleServerInfo2;
private ShuffleWriteClientImpl shuffleWriteClientImpl;
@BeforeAll
public static void setupServers(@TempDir File tmpDir) throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
createCoordinatorServer(coordinatorConf);
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 4000);
File dataDir1 = new File(tmpDir, "data1");
File dataDir2 = new File(tmpDir, "data2");
String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
shuffleServerConf.setString("rss.storage.type", StorageType.LOCALFILE.name());
shuffleServerConf.setString("rss.storage.basePath", basePath);
createShuffleServer(shuffleServerConf);
File dataDir3 = new File(tmpDir, "data3");
File dataDir4 = new File(tmpDir, "data4");
basePath = dataDir3.getAbsolutePath() + "," + dataDir4.getAbsolutePath();
shuffleServerConf.setString("rss.storage.basePath", basePath);
shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 1);
shuffleServerConf.setInteger("rss.jetty.http.port", 18081);
createShuffleServer(shuffleServerConf);
startServers();
shuffleServerInfo1 =
new ShuffleServerInfo("127.0.0.1-20001", shuffleServers.get(0).getIp(), SHUFFLE_SERVER_PORT);
shuffleServerInfo2 =
new ShuffleServerInfo("127.0.0.1-20001", shuffleServers.get(1).getIp(), SHUFFLE_SERVER_PORT + 1);
}
@BeforeEach
public void createClient() {
shuffleWriteClientImpl = new ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
1, 1, 1, true, 1, 1, 10, 10);
}
@AfterEach
public void closeClient() {
shuffleWriteClientImpl.close();
}
@Test
public void rpcFailTest() throws Exception {
String testAppId = "rpcFailTest";
shuffleWriteClientImpl.registerShuffle(
shuffleServerInfo1,
testAppId,
0,
Lists.newArrayList(new PartitionRange(0, 0)),
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL,
-1
);
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
// simulator a failed server
ShuffleServerInfo fakeShuffleServerInfo =
new ShuffleServerInfo("127.0.0.1-20001", shuffleServers.get(0).getIp(), SHUFFLE_SERVER_PORT + 100);
List<ShuffleBlockInfo> blocks = createShuffleBlockList(
0, 0, 0, 3, 25, blockIdBitmap,
expectedData, Lists.newArrayList(shuffleServerInfo1, fakeShuffleServerInfo));
SendShuffleDataResult result = shuffleWriteClientImpl.sendShuffleData(testAppId, blocks, () -> false);
Roaring64NavigableMap failedBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap succBlockIdBitmap = Roaring64NavigableMap.bitmapOf();
for (Long blockId : result.getFailedBlockIds()) {
failedBlockIdBitmap.addLong(blockId);
}
for (Long blockId : result.getSuccessBlockIds()) {
succBlockIdBitmap.addLong(blockId);
}
// There will no failed blocks when replica=2
assertEquals(failedBlockIdBitmap.getLongCardinality(), 0);
assertEquals(blockIdBitmap, succBlockIdBitmap);
boolean commitResult = shuffleWriteClientImpl.sendCommit(Sets.newHashSet(
shuffleServerInfo1, fakeShuffleServerInfo), testAppId, 0, 2);
assertFalse(commitResult);
// Report will success when replica=2
Map<Integer, List<Long>> ptb = Maps.newHashMap();
ptb.put(0, Lists.newArrayList(blockIdBitmap.stream().iterator()));
Map<Integer, List<ShuffleServerInfo>> partitionToServers = Maps.newHashMap();
partitionToServers.put(0, Lists.newArrayList(
shuffleServerInfo1, fakeShuffleServerInfo));
shuffleWriteClientImpl.reportShuffleResult(partitionToServers, testAppId, 0, 0, ptb, 2);
Roaring64NavigableMap report = shuffleWriteClientImpl.getShuffleResult("GRPC",
Sets.newHashSet(shuffleServerInfo1, fakeShuffleServerInfo),
testAppId, 0, 0);
assertEquals(blockIdBitmap, report);
}
@Test
public void reportBlocksToShuffleServerIfNecessary() {
String testAppId = "reportBlocksToShuffleServerIfNecessary_appId";
shuffleWriteClientImpl.registerShuffle(
shuffleServerInfo1,
testAppId,
1,
Lists.newArrayList(new PartitionRange(1, 1)),
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL,
-1
);
shuffleWriteClientImpl.registerShuffle(
shuffleServerInfo2,
testAppId,
1,
Lists.newArrayList(new PartitionRange(2, 2)),
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL,
-1
);
Map<Integer, List<ShuffleServerInfo>> partitionToServers = Maps.newHashMap();
partitionToServers.put(1, Lists.newArrayList(shuffleServerInfo1));
partitionToServers.put(2, Lists.newArrayList(shuffleServerInfo2));
Map<Integer, List<Long>> partitionToBlocks = Maps.newHashMap();
List<Long> blockIds = Lists.newArrayList();
int partitionIdx = 1;
for (int i = 0; i < 5; i++) {
blockIds.add(ClientUtils.getBlockId(partitionIdx, 0, i));
}
partitionToBlocks.put(partitionIdx, blockIds);
// case1
shuffleWriteClientImpl
.reportShuffleResult(partitionToServers, testAppId, 1, 0, partitionToBlocks, 1);
Roaring64NavigableMap bitmap = shuffleWriteClientImpl
.getShuffleResult("GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId,
1, 0);
assertTrue(bitmap.isEmpty());
bitmap = shuffleWriteClientImpl
.getShuffleResult("GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId,
1, partitionIdx);
assertEquals(5, bitmap.getLongCardinality());
for (int i = 0; i < 5; i++) {
assertTrue(bitmap.contains(partitionToBlocks.get(1).get(i)));
}
}
@Test
public void reportMultipleServerTest() throws Exception {
String testAppId = "reportMultipleServerTest";
shuffleWriteClientImpl.registerShuffle(
shuffleServerInfo1,
testAppId,
1,
Lists.newArrayList(new PartitionRange(1, 1)),
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL,
-1
);
shuffleWriteClientImpl.registerShuffle(
shuffleServerInfo2,
testAppId,
1,
Lists.newArrayList(new PartitionRange(2, 2)),
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL,
-1
);
Map<Integer, List<ShuffleServerInfo>> partitionToServers = Maps.newHashMap();
partitionToServers.putIfAbsent(1, Lists.newArrayList(shuffleServerInfo1));
partitionToServers.putIfAbsent(2, Lists.newArrayList(shuffleServerInfo2));
Map<Integer, List<Long>> partitionToBlocks = Maps.newHashMap();
List<Long> blockIds = Lists.newArrayList();
for (int i = 0; i < 5; i++) {
blockIds.add(ClientUtils.getBlockId(1, 0, i));
}
partitionToBlocks.put(1, blockIds);
blockIds = Lists.newArrayList();
for (int i = 0; i < 7; i++) {
blockIds.add(ClientUtils.getBlockId(2, 0, i));
}
partitionToBlocks.put(2, blockIds);
shuffleWriteClientImpl
.reportShuffleResult(partitionToServers, testAppId, 1, 0, partitionToBlocks, 1);
Roaring64NavigableMap bitmap = shuffleWriteClientImpl
.getShuffleResult("GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId,
1, 0);
assertTrue(bitmap.isEmpty());
bitmap = shuffleWriteClientImpl
.getShuffleResult("GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId,
1, 1);
assertEquals(5, bitmap.getLongCardinality());
for (int i = 0; i < 5; i++) {
assertTrue(bitmap.contains(partitionToBlocks.get(1).get(i)));
}
bitmap = shuffleWriteClientImpl
.getShuffleResult("GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId,
1, 2);
assertTrue(bitmap.isEmpty());
bitmap = shuffleWriteClientImpl
.getShuffleResult("GRPC", Sets.newHashSet(shuffleServerInfo2), testAppId,
1, 0);
assertTrue(bitmap.isEmpty());
bitmap = shuffleWriteClientImpl
.getShuffleResult("GRPC", Sets.newHashSet(shuffleServerInfo2), testAppId,
1, 1);
assertTrue(bitmap.isEmpty());
bitmap = shuffleWriteClientImpl
.getShuffleResult("GRPC", Sets.newHashSet(shuffleServerInfo2), testAppId,
1, 2);
assertEquals(7, bitmap.getLongCardinality());
for (int i = 0; i < 7; i++) {
assertTrue(bitmap.contains(partitionToBlocks.get(2).get(i)));
}
}
@Test
public void writeReadTest() throws Exception {
String testAppId = "writeReadTest";
shuffleWriteClientImpl.registerShuffle(
shuffleServerInfo1,
testAppId,
0,
Lists.newArrayList(new PartitionRange(0, 0)),
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL,
-1
);
shuffleWriteClientImpl.registerShuffle(
shuffleServerInfo2,
testAppId,
0,
Lists.newArrayList(new PartitionRange(0, 0)),
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL,
-1
);
Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
List<ShuffleBlockInfo> blocks = createShuffleBlockList(
0, 0, 0, 3, 25, blockIdBitmap,
expectedData, Lists.newArrayList(shuffleServerInfo1, shuffleServerInfo2));
shuffleWriteClientImpl.sendShuffleData(testAppId, blocks, () -> false);
// send 1st commit, finish commit won't be sent to Shuffle server and data won't be persisted to disk
boolean commitResult = shuffleWriteClientImpl
.sendCommit(Sets.newHashSet(shuffleServerInfo1, shuffleServerInfo2), testAppId, 0, 2);
assertTrue(commitResult);
ShuffleReadClientImpl readClient = new ShuffleReadClientImpl(StorageType.LOCALFILE.name(), testAppId, 0, 0, 100, 1,
10, 1000, "", blockIdBitmap, taskIdBitmap,
Lists.newArrayList(shuffleServerInfo1, shuffleServerInfo2), null, new DefaultIdHelper());
assertNull(readClient.readShuffleBlockData());
readClient.close();
// send 2nd commit, data will be persisted to disk
commitResult = shuffleWriteClientImpl
.sendCommit(Sets.newHashSet(shuffleServerInfo1, shuffleServerInfo2), testAppId, 0, 2);
assertTrue(commitResult);
readClient = new ShuffleReadClientImpl(StorageType.LOCALFILE.name(), testAppId, 0, 0, 100, 1,
10, 1000, "", blockIdBitmap, taskIdBitmap,
Lists.newArrayList(shuffleServerInfo1, shuffleServerInfo2), null, new DefaultIdHelper());
validateResult(readClient, expectedData);
readClient.checkProcessedBlockIds();
readClient.close();
// commit will be failed because of fakeIp
commitResult = shuffleWriteClientImpl.sendCommit(Sets.newHashSet(new ShuffleServerInfo(
"127.0.0.1-20001", "fakeIp", SHUFFLE_SERVER_PORT)), testAppId, 0, 2);
assertFalse(commitResult);
// wait resource to be deleted
Thread.sleep(6000);
// commit is ok, but finish shuffle rpc will failed because resource was deleted
commitResult = shuffleWriteClientImpl
.sendCommit(Sets.newHashSet(shuffleServerInfo1, shuffleServerInfo2), testAppId, 0, 2);
assertFalse(commitResult);
}
@Test
public void emptyTaskTest() {
String testAppId = "emptyTaskTest";
shuffleWriteClientImpl.registerShuffle(
shuffleServerInfo1,
testAppId,
0,
Lists.newArrayList(new PartitionRange(0, 0)),
new RemoteStorageInfo(""),
ShuffleDataDistributionType.NORMAL,
-1
);
boolean commitResult = shuffleWriteClientImpl
.sendCommit(Sets.newHashSet(shuffleServerInfo1), testAppId, 0, 2);
assertTrue(commitResult);
commitResult = shuffleWriteClientImpl
.sendCommit(Sets.newHashSet(shuffleServerInfo2), testAppId, 0, 2);
assertFalse(commitResult);
}
@Test
public void testRetryAssgin() throws Throwable {
int maxTryTime = shuffleServers.size();
AtomicInteger tryTime = new AtomicInteger();
String appId = "app-1";
RemoteStorageInfo remoteStorage = new RemoteStorageInfo("");
ShuffleAssignmentsInfo response = null;
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
int heartbeatInterval = shuffleServerConf.getInteger("rss.server.heartbeat.interval", 1000);
Thread.sleep(heartbeatInterval * 2);
shuffleWriteClientImpl.registerCoordinators(COORDINATOR_QUORUM);
response = RetryUtils.retry(() -> {
int currentTryTime = tryTime.incrementAndGet();
ShuffleAssignmentsInfo shuffleAssignments = shuffleWriteClientImpl.getShuffleAssignments(appId,
1, 1, 1, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 1, -1);
Map<ShuffleServerInfo, List<PartitionRange>> serverToPartitionRanges =
shuffleAssignments.getServerToPartitionRanges();
serverToPartitionRanges.entrySet().forEach(entry -> {
if (currentTryTime < maxTryTime) {
shuffleServers.forEach((ss) -> {
if (ss.getId().equals(entry.getKey().getId())) {
try {
ss.stopServer();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
shuffleWriteClientImpl.registerShuffle(
entry.getKey(),
appId,
0,
entry.getValue(),
remoteStorage,
ShuffleDataDistributionType.NORMAL,
-1
);
});
return shuffleAssignments;
}, heartbeatInterval, maxTryTime);
assertNotNull(response);
}
}