blob: 280de66a69faeba261cf07de2a2119c6d3b0dbd6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.uniffle.test;
import java.io.File;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.google.protobuf.ByteString;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
import org.apache.uniffle.client.request.RssAppHeartBeatRequest;
import org.apache.uniffle.client.request.RssFinishShuffleRequest;
import org.apache.uniffle.client.request.RssGetShuffleDataRequest;
import org.apache.uniffle.client.request.RssGetShuffleIndexRequest;
import org.apache.uniffle.client.request.RssGetShuffleResultRequest;
import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssReportShuffleResultRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssGetShuffleResultResponse;
import org.apache.uniffle.client.response.RssReportShuffleResultResponse;
import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.ShuffleServerGrpcMetrics;
import org.apache.uniffle.server.ShuffleServerMetrics;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class ShuffleServerGrpcTest extends IntegrationTestBase {
private ShuffleServerGrpcClient shuffleServerClient;
private AtomicInteger atomicInteger = new AtomicInteger(0);
private static Long EVENT_THRESHOLD_SIZE = 2048L;
private static final int GB = 1024 * 1024 * 1024;
@BeforeAll
public static void setupServers() throws Exception {
CoordinatorConf coordinatorConf = getCoordinatorConf();
coordinatorConf.setLong(CoordinatorConf.COORDINATOR_APP_EXPIRED, 2000);
createCoordinatorServer(coordinatorConf);
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
File tmpDir = Files.createTempDir();
File dataDir1 = new File(tmpDir, "data1");
String basePath = dataDir1.getAbsolutePath();
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
shuffleServerConf.set(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, EVENT_THRESHOLD_SIZE);
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath));
shuffleServerConf.set(RssBaseConf.RPC_METRICS_ENABLED, true);
shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
shuffleServerConf.set(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED, 5000L);
createShuffleServer(shuffleServerConf);
startServers();
}
@BeforeEach
public void createClient() {
shuffleServerClient = new ShuffleServerGrpcClient(LOCALHOST, SHUFFLE_SERVER_PORT);
}
@Test
public void clearResourceTest() throws Exception {
final ShuffleWriteClient shuffleWriteClient =
ShuffleClientFactory.getInstance().createShuffleWriteClient(
"GRPC", 2, 10000L, 4, 1, 1, 1, true, 1, 1);
shuffleWriteClient.registerCoordinators("127.0.0.1:19999");
shuffleWriteClient.registerShuffle(
new ShuffleServerInfo("127.0.0.1-20001", "127.0.0.1", 20001),
"clearResourceTest1",
0,
Lists.newArrayList(new PartitionRange(0, 1)), new RemoteStorageInfo(""));
shuffleWriteClient.sendAppHeartbeat("clearResourceTest1", 1000L);
shuffleWriteClient.sendAppHeartbeat("clearResourceTest2", 1000L);
RssRegisterShuffleRequest rrsr = new RssRegisterShuffleRequest("clearResourceTest1", 0,
Lists.newArrayList(new PartitionRange(0, 1)), "");
shuffleServerClient.registerShuffle(rrsr);
rrsr = new RssRegisterShuffleRequest("clearResourceTest2", 0,
Lists.newArrayList(new PartitionRange(0, 1)), "");
shuffleServerClient.registerShuffle(rrsr);
assertEquals(Sets.newHashSet("clearResourceTest1", "clearResourceTest2"),
shuffleServers.get(0).getShuffleTaskManager().getAppIds());
// Thread will keep refresh clearResourceTest1 in coordinator
Thread t = new Thread(() -> {
int i = 0;
while (i < 20) {
shuffleWriteClient.sendAppHeartbeat("clearResourceTest1", 1000L);
i++;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
return;
}
}
});
t.start();
// Heartbeat is sent to coordinator too]
Thread.sleep(3000);
shuffleServerClient.registerShuffle(new RssRegisterShuffleRequest("clearResourceTest1", 0,
Lists.newArrayList(new PartitionRange(0, 1)), ""));
assertEquals(Sets.newHashSet("clearResourceTest1"),
coordinators.get(0).getApplicationManager().getAppIds());
// clearResourceTest2 will be removed because of rss.server.app.expired.withoutHeartbeat
Thread.sleep(2000);
assertEquals(Sets.newHashSet("clearResourceTest1"),
shuffleServers.get(0).getShuffleTaskManager().getAppIds());
// clearResourceTest1 will be removed because of rss.server.app.expired.withoutHeartbeat
t.interrupt();
Thread.sleep(8000);
assertEquals(0, shuffleServers.get(0).getShuffleTaskManager().getAppIds().size());
}
@Test
public void shuffleResultTest() throws Exception {
Map<Integer, List<Long>> partitionToBlockIds = Maps.newHashMap();
List<Long> blockIds1 = getBlockIdList(1, 3);
List<Long> blockIds2 = getBlockIdList(2, 2);
List<Long> blockIds3 = getBlockIdList(3, 1);
partitionToBlockIds.put(1, blockIds1);
partitionToBlockIds.put(2, blockIds2);
partitionToBlockIds.put(3, blockIds3);
RssReportShuffleResultRequest request =
new RssReportShuffleResultRequest("shuffleResultTest", 0, 0L, partitionToBlockIds, 1);
try {
shuffleServerClient.reportShuffleResult(request);
fail("Exception should be thrown");
} catch (Exception e) {
assertTrue(e.getMessage().contains("error happened when report shuffle result"));
}
RssGetShuffleResultRequest req = new RssGetShuffleResultRequest("shuffleResultTest", 1, 1);
try {
shuffleServerClient.getShuffleResult(req);
fail("Exception should be thrown");
} catch (Exception e) {
assertTrue(e.getMessage().contains("Can't get shuffle result"));
}
RssRegisterShuffleRequest rrsr = new RssRegisterShuffleRequest("shuffleResultTest", 100,
Lists.newArrayList(new PartitionRange(0, 1)), "");
shuffleServerClient.registerShuffle(rrsr);
req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 1);
RssGetShuffleResultResponse result = shuffleServerClient.getShuffleResult(req);
Roaring64NavigableMap blockIdBitmap = result.getBlockIdBitmap();
assertEquals(Roaring64NavigableMap.bitmapOf(), blockIdBitmap);
request =
new RssReportShuffleResultRequest("shuffleResultTest", 0, 0L, partitionToBlockIds, 1);
RssReportShuffleResultResponse response = shuffleServerClient.reportShuffleResult(request);
assertEquals(ResponseStatusCode.SUCCESS, response.getStatusCode());
req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 1);
result = shuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
Roaring64NavigableMap expectedP1 = Roaring64NavigableMap.bitmapOf();
addExpectedBlockIds(expectedP1, blockIds1);
assertEquals(expectedP1, blockIdBitmap);
req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 2);
result = shuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
Roaring64NavigableMap expectedP2 = Roaring64NavigableMap.bitmapOf();
addExpectedBlockIds(expectedP2, blockIds2);
assertEquals(expectedP2, blockIdBitmap);
req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 3);
result = shuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
Roaring64NavigableMap expectedP3 = Roaring64NavigableMap.bitmapOf();
addExpectedBlockIds(expectedP3, blockIds3);
assertEquals(expectedP3, blockIdBitmap);
partitionToBlockIds = Maps.newHashMap();
blockIds1 = getBlockIdList(1, 3);
blockIds2 = getBlockIdList(2, 2);
blockIds3 = getBlockIdList(3, 1);
partitionToBlockIds.put(1, blockIds1);
partitionToBlockIds.put(2, blockIds2);
partitionToBlockIds.put(3, blockIds3);
request =
new RssReportShuffleResultRequest("shuffleResultTest", 0, 1L, partitionToBlockIds, 1);
shuffleServerClient.reportShuffleResult(request);
req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 1);
result = shuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
addExpectedBlockIds(expectedP1, blockIds1);
assertEquals(expectedP1, blockIdBitmap);
req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 2);
result = shuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
addExpectedBlockIds(expectedP2, blockIds2);
assertEquals(expectedP2, blockIdBitmap);
req = new RssGetShuffleResultRequest("shuffleResultTest", 0, 3);
result = shuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
addExpectedBlockIds(expectedP3, blockIds3);
assertEquals(expectedP3, blockIdBitmap);
request =
new RssReportShuffleResultRequest("shuffleResultTest", 1, 1L, Maps.newHashMap(), 1);
shuffleServerClient.reportShuffleResult(request);
req = new RssGetShuffleResultRequest("shuffleResultTest", 1, 1);
result = shuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
assertEquals(Roaring64NavigableMap.bitmapOf(), blockIdBitmap);
// test with bitmapNum > 1
partitionToBlockIds = Maps.newHashMap();
blockIds1 = getBlockIdList(1, 3);
blockIds2 = getBlockIdList(2, 2);
blockIds3 = getBlockIdList(3, 1);
partitionToBlockIds.put(1, blockIds1);
partitionToBlockIds.put(2, blockIds2);
partitionToBlockIds.put(3, blockIds3);
request =
new RssReportShuffleResultRequest("shuffleResultTest", 2, 1L, partitionToBlockIds, 3);
shuffleServerClient.reportShuffleResult(request);
// validate bitmap in shuffleTaskManager
Roaring64NavigableMap[] bitmaps = shuffleServers.get(0).getShuffleTaskManager()
.getPartitionsToBlockIds().get("shuffleResultTest").get(2);
assertEquals(3, bitmaps.length);
req = new RssGetShuffleResultRequest("shuffleResultTest", 2, 1);
result = shuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
expectedP1 = Roaring64NavigableMap.bitmapOf();
addExpectedBlockIds(expectedP1, blockIds1);
assertEquals(expectedP1, blockIdBitmap);
req = new RssGetShuffleResultRequest("shuffleResultTest", 2, 2);
result = shuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
expectedP2 = Roaring64NavigableMap.bitmapOf();
addExpectedBlockIds(expectedP2, blockIds2);
assertEquals(expectedP2, blockIdBitmap);
req = new RssGetShuffleResultRequest("shuffleResultTest", 2, 3);
result = shuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
expectedP3 = Roaring64NavigableMap.bitmapOf();
addExpectedBlockIds(expectedP3, blockIds3);
assertEquals(expectedP3, blockIdBitmap);
partitionToBlockIds = Maps.newHashMap();
blockIds1 = getBlockIdList((int) Constants.MAX_PARTITION_ID, 3);
blockIds2 = getBlockIdList(2, 2);
blockIds3 = getBlockIdList(3, 1);
partitionToBlockIds.put((int) Constants.MAX_PARTITION_ID, blockIds1);
partitionToBlockIds.put(2, blockIds2);
partitionToBlockIds.put(3, blockIds3);
// bimapNum = 2
request =
new RssReportShuffleResultRequest("shuffleResultTest", 4, 1L, partitionToBlockIds, 2);
shuffleServerClient.reportShuffleResult(request);
req = new RssGetShuffleResultRequest("shuffleResultTest", 4, (int) Constants.MAX_PARTITION_ID);
result = shuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
expectedP1 = Roaring64NavigableMap.bitmapOf();
addExpectedBlockIds(expectedP1, blockIds1);
assertEquals(expectedP1, blockIdBitmap);
req = new RssGetShuffleResultRequest("shuffleResultTest", 4, 2);
result = shuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
expectedP2 = Roaring64NavigableMap.bitmapOf();
addExpectedBlockIds(expectedP2, blockIds2);
assertEquals(expectedP2, blockIdBitmap);
req = new RssGetShuffleResultRequest("shuffleResultTest", 4, 3);
result = shuffleServerClient.getShuffleResult(req);
blockIdBitmap = result.getBlockIdBitmap();
expectedP3 = Roaring64NavigableMap.bitmapOf();
addExpectedBlockIds(expectedP3, blockIds3);
assertEquals(expectedP3, blockIdBitmap);
// wait resources are deleted
Thread.sleep(12000);
req = new RssGetShuffleResultRequest("shuffleResultTest", 1, 1);
try {
shuffleServerClient.getShuffleResult(req);
fail("Exception should be thrown");
} catch (Exception e) {
assertTrue(e.getMessage().contains("Can't get shuffle result"));
}
}
@Test
public void registerTest() {
shuffleServerClient.registerShuffle(new RssRegisterShuffleRequest("registerTest", 0,
Lists.newArrayList(new PartitionRange(0, 1)), ""));
RssGetShuffleResultRequest req = new RssGetShuffleResultRequest("registerTest", 0, 0);
// no exception with getShuffleResult means register successfully
shuffleServerClient.getShuffleResult(req);
req = new RssGetShuffleResultRequest("registerTest", 0, 1);
shuffleServerClient.getShuffleResult(req);
shuffleServerClient.registerShuffle(new RssRegisterShuffleRequest("registerTest", 1,
Lists.newArrayList(new PartitionRange(0, 0), new PartitionRange(1, 1), new PartitionRange(2, 2)), ""));
req = new RssGetShuffleResultRequest("registerTest", 1, 0);
shuffleServerClient.getShuffleResult(req);
req = new RssGetShuffleResultRequest("registerTest", 1, 1);
shuffleServerClient.getShuffleResult(req);
req = new RssGetShuffleResultRequest("registerTest", 1, 2);
shuffleServerClient.getShuffleResult(req);
// registerShuffle with remote storage
String appId1 = "remote_storage_register_app1";
String appId2 = "remote_storage_register_app2";
String remoteStorage = "hdfs://cluster1";
shuffleServerClient.registerShuffle(new RssRegisterShuffleRequest(appId1, 0,
Lists.newArrayList(new PartitionRange(0, 1)), remoteStorage));
ShuffleDataFlushEvent event1 = new ShuffleDataFlushEvent(1, appId1, 1, 1,1,
EVENT_THRESHOLD_SIZE + 1, null, null, null);
assertEquals(remoteStorage, shuffleServers.get(0).getStorageManager().selectStorage(event1).getStoragePath());
ShuffleDataFlushEvent event2 = new ShuffleDataFlushEvent(1, appId2, 1, 1,1,
EVENT_THRESHOLD_SIZE + 1, null, null, null);
try {
// can't find storage info with appId2
shuffleServers.get(0).getStorageManager().selectStorage(event2).getStoragePath();
fail("Exception should be thrown with un-register appId");
} catch (Exception e) {
// expected exception, ignore
}
// appId -> remote storage won't change if register again with the different remote storage
shuffleServerClient.registerShuffle(new RssRegisterShuffleRequest(appId1, 0,
Lists.newArrayList(new PartitionRange(0, 1)), remoteStorage + "another"));
assertEquals(remoteStorage, shuffleServers.get(0).getStorageManager().selectStorage(event1).getStoragePath());
}
@Test
public void sendDataWithoutRegisterTest() throws Exception {
List<ShuffleBlockInfo> blockInfos = Lists.newArrayList(new ShuffleBlockInfo(0, 0, 0, 100, 0,
new byte[]{}, Lists.newArrayList(), 0, 100, 0));
Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
partitionToBlocks.put(0, blockInfos);
Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks = Maps.newHashMap();
shuffleToBlocks.put(0, partitionToBlocks);
RssSendShuffleDataRequest rssdr = new RssSendShuffleDataRequest(
"sendDataWithoutRegisterTest", 3, 1000, shuffleToBlocks);
shuffleServerClient.sendShuffleData(rssdr);
assertEquals(132, shuffleServers.get(0).getPreAllocatedMemory());
Thread.sleep(10000);
assertEquals(0, shuffleServers.get(0).getPreAllocatedMemory());
}
@Test
public void sendDataWithoutRequirePreAllocation() throws Exception {
String appId = "sendDataWithoutRequirePreAllocation";
List<ShuffleBlockInfo> blockInfos = Lists.newArrayList(new ShuffleBlockInfo(0, 0, 0, 100, 0,
new byte[]{}, Lists.newArrayList(), 0, 100, 0));
Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
partitionToBlocks.put(0, blockInfos);
Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks = Maps.newHashMap();
shuffleToBlocks.put(0, partitionToBlocks);
for (Map.Entry<Integer, Map<Integer, List<ShuffleBlockInfo>>> stb : shuffleToBlocks.entrySet()) {
List<RssProtos.ShuffleData> shuffleData = Lists.newArrayList();
for (Map.Entry<Integer, List<ShuffleBlockInfo>> ptb : stb.getValue().entrySet()) {
List<RssProtos.ShuffleBlock> shuffleBlocks = Lists.newArrayList();
for (ShuffleBlockInfo sbi : ptb.getValue()) {
shuffleBlocks.add(RssProtos.ShuffleBlock.newBuilder().setBlockId(sbi.getBlockId())
.setCrc(sbi.getCrc())
.setLength(sbi.getLength())
.setTaskAttemptId(sbi.getTaskAttemptId())
.setUncompressLength(sbi.getUncompressLength())
.setData(ByteString.copyFrom(sbi.getData()))
.build());
}
shuffleData.add(RssProtos.ShuffleData.newBuilder().setPartitionId(ptb.getKey())
.addAllBlock(shuffleBlocks)
.build());
}
RssProtos.SendShuffleDataRequest rpcRequest = RssProtos.SendShuffleDataRequest.newBuilder()
.setAppId(appId)
.setShuffleId(0)
.setRequireBufferId(10000)
.addAllShuffleData(shuffleData)
.build();
RssProtos.SendShuffleDataResponse response =
shuffleServerClient.getBlockingStub().sendShuffleData(rpcRequest);
assertTrue(RssProtos.StatusCode.INTERNAL_ERROR.equals(response.getStatus()));
assertTrue(response.getRetMsg().contains("Can't find requireBufferId[10000]"));
}
}
@Test
public void multipleShuffleResultTest() throws Exception {
Set<Long> expectedBlockIds = Sets.newConcurrentHashSet();
RssRegisterShuffleRequest rrsr = new RssRegisterShuffleRequest("multipleShuffleResultTest", 100,
Lists.newArrayList(new PartitionRange(0, 1)), "");
shuffleServerClient.registerShuffle(rrsr);
Runnable r1 = () -> {
for (int i = 0; i < 100; i++) {
Map<Integer, List<Long>> ptbs = Maps.newHashMap();
List<Long> blockIds = Lists.newArrayList();
Long blockId = ClientUtils.getBlockId(1, 0, i);
expectedBlockIds.add(blockId);
blockIds.add(blockId);
ptbs.put(1, blockIds);
RssReportShuffleResultRequest req1 =
new RssReportShuffleResultRequest("multipleShuffleResultTest", 1, 0, ptbs, 1);
shuffleServerClient.reportShuffleResult(req1);
}
};
Runnable r2 = () -> {
for (int i = 100; i < 200; i++) {
Map<Integer, List<Long>> ptbs = Maps.newHashMap();
List<Long> blockIds = Lists.newArrayList();
Long blockId = ClientUtils.getBlockId(1, 1, i);
expectedBlockIds.add(blockId);
blockIds.add(blockId);
ptbs.put(1, blockIds);
RssReportShuffleResultRequest req1 =
new RssReportShuffleResultRequest("multipleShuffleResultTest", 1, 1, ptbs, 1);
shuffleServerClient.reportShuffleResult(req1);
}
};
Runnable r3 = () -> {
for (int i = 200; i < 300; i++) {
Map<Integer, List<Long>> ptbs = Maps.newHashMap();
List<Long> blockIds = Lists.newArrayList();
Long blockId = ClientUtils.getBlockId(1, 2, i);
expectedBlockIds.add(blockId);
blockIds.add(blockId);
ptbs.put(1, blockIds);
RssReportShuffleResultRequest req1 =
new RssReportShuffleResultRequest("multipleShuffleResultTest", 1, 2, ptbs, 1);
shuffleServerClient.reportShuffleResult(req1);
}
};
Thread t1 = new Thread(r1);
Thread t2 = new Thread(r2);
Thread t3 = new Thread(r3);
t1.start();
t2.start();
t3.start();
t1.join();
t2.join();
t3.join();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
for (Long blockId : expectedBlockIds) {
blockIdBitmap.addLong(blockId);
}
RssGetShuffleResultRequest req = new RssGetShuffleResultRequest(
"multipleShuffleResultTest", 1, 1);
RssGetShuffleResultResponse result = shuffleServerClient.getShuffleResult(req);
Roaring64NavigableMap actualBlockIdBitmap = result.getBlockIdBitmap();
assertEquals(blockIdBitmap, actualBlockIdBitmap);
}
@Disabled("flaky test")
@Test
public void rpcMetricsTest() {
String appId = "rpcMetricsTest";
int shuffleId = 0;
final double oldGrpcTotal = shuffleServers.get(0).getGrpcMetrics().getCounterGrpcTotal().get();
double oldValue = shuffleServers.get(0).getGrpcMetrics().getCounterMap()
.get(ShuffleServerGrpcMetrics.REGISTER_SHUFFLE_METHOD).get();
shuffleServerClient.registerShuffle(new RssRegisterShuffleRequest(appId, shuffleId,
Lists.newArrayList(new PartitionRange(0, 1)), ""));
double newValue = shuffleServers.get(0).getGrpcMetrics().getCounterMap()
.get(ShuffleServerGrpcMetrics.REGISTER_SHUFFLE_METHOD).get();
assertEquals(oldValue + 1, newValue, 0.5);
assertEquals(0,
shuffleServers.get(0).getGrpcMetrics().getGaugeMap().get(
ShuffleServerGrpcMetrics.REGISTER_SHUFFLE_METHOD).get(), 0.5);
oldValue = shuffleServers.get(0).getGrpcMetrics().getCounterMap().get(
ShuffleServerGrpcMetrics.APP_HEARTBEAT_METHOD).get();
shuffleServerClient.sendHeartBeat(new RssAppHeartBeatRequest(appId, 10000));
newValue = shuffleServers.get(0).getGrpcMetrics().getCounterMap().get(
ShuffleServerGrpcMetrics.APP_HEARTBEAT_METHOD).get();
assertEquals(oldValue + 1, newValue, 0.5);
assertEquals(0,
shuffleServers.get(0).getGrpcMetrics().getGaugeMap().get(
ShuffleServerGrpcMetrics.APP_HEARTBEAT_METHOD).get(), 0.5);
oldValue = shuffleServers.get(0).getGrpcMetrics().getCounterMap().get(
ShuffleServerGrpcMetrics.REQUIRE_BUFFER_METHOD).get();
shuffleServerClient.requirePreAllocation(100, 10, 1000);
newValue = shuffleServers.get(0).getGrpcMetrics().getCounterMap().get(
ShuffleServerGrpcMetrics.REQUIRE_BUFFER_METHOD).get();
assertEquals(oldValue + 1, newValue, 0.5);
assertEquals(0,
shuffleServers.get(0).getGrpcMetrics().getGaugeMap().get(
ShuffleServerGrpcMetrics.REQUIRE_BUFFER_METHOD).get(), 0.5);
oldValue = shuffleServers.get(0).getGrpcMetrics().getCounterMap().get(
ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD).get();
List<ShuffleBlockInfo> blockInfos = Lists.newArrayList(new ShuffleBlockInfo(shuffleId, 0, 0, 100, 0,
new byte[]{}, Lists.newArrayList(), 0, 100, 0));
Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap();
partitionToBlocks.put(0, blockInfos);
Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks = Maps.newHashMap();
shuffleToBlocks.put(0, partitionToBlocks);
RssSendShuffleDataRequest rssdr = new RssSendShuffleDataRequest(
appId, 3, 1000, shuffleToBlocks);
shuffleServerClient.sendShuffleData(rssdr);
newValue = shuffleServers.get(0).getGrpcMetrics().getCounterMap().get(
ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD).get();
assertEquals(oldValue + 1, newValue, 0.5);
assertEquals(0,
shuffleServers.get(0).getGrpcMetrics().getGaugeMap().get(
ShuffleServerGrpcMetrics.SEND_SHUFFLE_DATA_METHOD).get(), 0.5);
oldValue = shuffleServers.get(0).getGrpcMetrics().getCounterMap().get(
ShuffleServerGrpcMetrics.COMMIT_SHUFFLE_TASK_METHOD).get();
shuffleServerClient.sendCommit(new RssSendCommitRequest(appId, shuffleId));
newValue = shuffleServers.get(0).getGrpcMetrics().getCounterMap().get(
ShuffleServerGrpcMetrics.COMMIT_SHUFFLE_TASK_METHOD).get();
assertEquals(oldValue + 1, newValue, 0.5);
assertEquals(0,
shuffleServers.get(0).getGrpcMetrics().getGaugeMap().get(
ShuffleServerGrpcMetrics.COMMIT_SHUFFLE_TASK_METHOD).get(), 0.5);
oldValue = shuffleServers.get(0).getGrpcMetrics().getCounterMap().get(
ShuffleServerGrpcMetrics.FINISH_SHUFFLE_METHOD).get();
shuffleServerClient.finishShuffle(new RssFinishShuffleRequest(appId, shuffleId));
newValue = shuffleServers.get(0).getGrpcMetrics().getCounterMap().get(
ShuffleServerGrpcMetrics.FINISH_SHUFFLE_METHOD).get();
assertEquals(oldValue + 1, newValue, 0.5);
assertEquals(0,
shuffleServers.get(0).getGrpcMetrics().getGaugeMap().get(
ShuffleServerGrpcMetrics.FINISH_SHUFFLE_METHOD).get(), 0.5);
oldValue = shuffleServers.get(0).getGrpcMetrics().getCounterMap().get(
ShuffleServerGrpcMetrics.REPORT_SHUFFLE_RESULT_METHOD).get();
Map<Integer, List<Long>> partitionToBlockIds = Maps.newHashMap();
List<Long> blockIds1 = getBlockIdList(1, 3);
List<Long> blockIds2 = getBlockIdList(2, 2);
List<Long> blockIds3 = getBlockIdList(3, 1);
partitionToBlockIds.put(1, blockIds1);
partitionToBlockIds.put(2, blockIds2);
partitionToBlockIds.put(3, blockIds3);
RssReportShuffleResultRequest request =
new RssReportShuffleResultRequest(appId, shuffleId, 0L, partitionToBlockIds, 1);
shuffleServerClient.reportShuffleResult(request);
newValue = shuffleServers.get(0).getGrpcMetrics().getCounterMap().get(
ShuffleServerGrpcMetrics.REPORT_SHUFFLE_RESULT_METHOD).get();
assertEquals(oldValue + 1, newValue, 0.5);
assertEquals(0,
shuffleServers.get(0).getGrpcMetrics().getGaugeMap().get(
ShuffleServerGrpcMetrics.REPORT_SHUFFLE_RESULT_METHOD).get(), 0.5);
oldValue = shuffleServers.get(0).getGrpcMetrics().getCounterMap().get(
ShuffleServerGrpcMetrics.GET_SHUFFLE_RESULT_METHOD).get();
shuffleServerClient.getShuffleResult(new RssGetShuffleResultRequest(appId, shuffleId, 1));
newValue = shuffleServers.get(0).getGrpcMetrics().getCounterMap().get(
ShuffleServerGrpcMetrics.GET_SHUFFLE_RESULT_METHOD).get();
assertEquals(oldValue + 1, newValue, 0.5);
assertEquals(0,
shuffleServers.get(0).getGrpcMetrics().getGaugeMap().get(
ShuffleServerGrpcMetrics.GET_SHUFFLE_RESULT_METHOD).get(), 0.5);
oldValue = shuffleServers.get(0).getGrpcMetrics().getCounterMap().get(
ShuffleServerGrpcMetrics.GET_SHUFFLE_INDEX_METHOD).get();
try {
shuffleServerClient.getShuffleIndex(new RssGetShuffleIndexRequest(
appId, shuffleId, 1, 1, 3));
} catch (Exception e) {
// ignore the exception, just test metrics value
}
newValue = shuffleServers.get(0).getGrpcMetrics().getCounterMap().get(
ShuffleServerGrpcMetrics.GET_SHUFFLE_INDEX_METHOD).get();
assertEquals(oldValue + 1, newValue, 0.5);
assertEquals(0,
shuffleServers.get(0).getGrpcMetrics().getGaugeMap().get(
ShuffleServerGrpcMetrics.GET_SHUFFLE_INDEX_METHOD).get(), 0.5);
oldValue = shuffleServers.get(0).getGrpcMetrics().getCounterMap().get(
ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD).get();
try {
shuffleServerClient.getShuffleData(new RssGetShuffleDataRequest(
appId, shuffleId, 0, 1, 3,
0, 100));
} catch (Exception e) {
// ignore the exception, just test metrics value
}
newValue = shuffleServers.get(0).getGrpcMetrics().getCounterMap().get(
ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD).get();
assertEquals(oldValue + 1, newValue, 0.5);
assertEquals(0,
shuffleServers.get(0).getGrpcMetrics().getGaugeMap().get(
ShuffleServerGrpcMetrics.GET_SHUFFLE_DATA_METHOD).get(), 0.5);
double newGrpcTotal = shuffleServers.get(0).getGrpcMetrics().getCounterGrpcTotal().get();
// require buffer will be called one more time when send data
assertEquals(oldGrpcTotal + 11, newGrpcTotal, 0.5);
assertEquals(0, shuffleServers.get(0).getGrpcMetrics().getGaugeGrpcOpen().get(), 0.5);
oldValue = ShuffleServerMetrics.counterTotalRequireBufferFailed.get();
// the next two allocations will fail
assertEquals(shuffleServerClient.requirePreAllocation(GB, 0, 10), -1);
assertEquals(shuffleServerClient.requirePreAllocation(GB, 0, 10), -1);
// the next two allocations will success
assertNotEquals(shuffleServerClient.requirePreAllocation(10, 0, 10), -1);
assertNotEquals(shuffleServerClient.requirePreAllocation(10, 0, 10), -1);
newValue = ShuffleServerMetrics.counterTotalRequireBufferFailed.get();
assertEquals((int)newValue, (int)oldValue + 2);
}
private List<Long> getBlockIdList(int partitionId, int blockNum) {
List<Long> blockIds = Lists.newArrayList();
for (int i = 0; i < blockNum; i++) {
blockIds.add(ClientUtils.getBlockId(partitionId, 0, atomicInteger.getAndIncrement()));
}
return blockIds;
}
private void addExpectedBlockIds(Roaring64NavigableMap bitmap, List<Long> blockIds) {
for (int i = 0; i < blockIds.size(); i++) {
bitmap.addLong(blockIds.get(i));
}
}
}