| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.uniffle.server; |
| |
| import java.io.File; |
| import java.util.Arrays; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.stream.Stream; |
| |
| import com.fasterxml.jackson.databind.JsonNode; |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| import org.junit.jupiter.api.AfterAll; |
| import org.junit.jupiter.api.AfterEach; |
| import org.junit.jupiter.api.BeforeAll; |
| import org.junit.jupiter.api.io.TempDir; |
| import org.junit.jupiter.params.ParameterizedTest; |
| import org.junit.jupiter.params.provider.Arguments; |
| import org.junit.jupiter.params.provider.MethodSource; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient; |
| import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcNettyClient; |
| import org.apache.uniffle.client.request.RssRegisterShuffleRequest; |
| import org.apache.uniffle.client.request.RssSendShuffleDataRequest; |
| import org.apache.uniffle.client.response.RssRegisterShuffleResponse; |
| import org.apache.uniffle.client.response.RssSendShuffleDataResponse; |
| import org.apache.uniffle.common.PartitionRange; |
| import org.apache.uniffle.common.ShuffleBlockInfo; |
| import org.apache.uniffle.common.metrics.TestUtils; |
| import org.apache.uniffle.common.rpc.ServerType; |
| import org.apache.uniffle.common.rpc.StatusCode; |
| import org.apache.uniffle.common.util.RssUtils; |
| import org.apache.uniffle.storage.util.StorageType; |
| |
| import static org.junit.jupiter.api.Assertions.assertSame; |
| import static org.junit.jupiter.api.Assertions.assertTrue; |
| |
| public class TopNShuffleDataSizeOfAppCalcTaskTest { |
| private static final Logger LOG = |
| LoggerFactory.getLogger(TopNShuffleDataSizeOfAppCalcTaskTest.class); |
| |
| protected static List<ShuffleServer> shuffleServers = Lists.newArrayList(); |
| private static ShuffleServerConf grpcShuffleServerConfig; |
| private static ShuffleServerConf nettyShuffleServerConfig; |
| private static final Long EVENT_THRESHOLD_SIZE = 2048L; |
| protected static final int SHUFFLE_SERVER_PORT = 20001; |
| |
| private static AtomicInteger serverRpcPortCounter = new AtomicInteger(); |
| private static AtomicInteger nettyPortCounter = new AtomicInteger(); |
| private static AtomicInteger jettyPortCounter = new AtomicInteger(); |
| |
| static @TempDir File tempDir; |
| |
| protected static final String LOCALHOST; |
| |
| static { |
| try { |
| LOCALHOST = RssUtils.getHostIp(); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| protected static final int COORDINATOR_PORT_1 = 19999; |
| protected static final int NETTY_PORT = 21000; |
| protected static final String COORDINATOR_QUORUM = LOCALHOST + ":" + COORDINATOR_PORT_1; |
| |
| @BeforeAll |
| public static void setupServers(@TempDir File tmpDir) throws Exception { |
| File dataDir1 = new File(tmpDir, "data1"); |
| String grpcBasePath = dataDir1.getAbsolutePath(); |
| ShuffleServerConf grpcShuffleServerConf = buildShuffleServerConf(ServerType.GRPC, grpcBasePath); |
| createShuffleServer(grpcShuffleServerConf); |
| |
| File dataDir2 = new File(tmpDir, "data2"); |
| String nettyBasePath = dataDir2.getAbsolutePath(); |
| ShuffleServerConf nettyShuffleServerConf = |
| buildShuffleServerConf(ServerType.GRPC_NETTY, nettyBasePath); |
| createShuffleServer(nettyShuffleServerConf); |
| |
| startServers(); |
| grpcShuffleServerConfig = grpcShuffleServerConf; |
| nettyShuffleServerConfig = nettyShuffleServerConf; |
| } |
| |
| private static ShuffleServerConf buildShuffleServerConf(ServerType serverType, String basePath) |
| throws Exception { |
| ShuffleServerConf shuffleServerConf = buildShuffleServerConf(serverType); |
| shuffleServerConf.setString( |
| ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name()); |
| shuffleServerConf.set( |
| ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, EVENT_THRESHOLD_SIZE); |
| shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath)); |
| shuffleServerConf.set(ShuffleServerConf.RPC_METRICS_ENABLED, true); |
| shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L); |
| shuffleServerConf.set(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED, 5000L); |
| shuffleServerConf.set(ShuffleServerConf.TOP_N_APP_SHUFFLE_DATA_REFRESH_INTERVAL, 700); |
| shuffleServerConf.set(ShuffleServerConf.TOP_N_APP_SHUFFLE_DATA_SIZE_NUMBER, 5); |
| return shuffleServerConf; |
| } |
| |
| protected static ShuffleServerConf buildShuffleServerConf(ServerType serverType) |
| throws Exception { |
| ShuffleServerConf serverConf = new ShuffleServerConf(); |
| serverConf.setInteger( |
| "rss.rpc.server.port", SHUFFLE_SERVER_PORT + serverRpcPortCounter.getAndIncrement()); |
| serverConf.setString("rss.storage.type", StorageType.MEMORY_LOCALFILE_HDFS.name()); |
| serverConf.setString("rss.storage.basePath", tempDir.getAbsolutePath()); |
| serverConf.setString("rss.server.buffer.capacity", "671088640"); |
| serverConf.setString("rss.server.memory.shuffle.highWaterMark", "50.0"); |
| serverConf.setString("rss.server.memory.shuffle.lowWaterMark", "0.0"); |
| serverConf.setString("rss.server.read.buffer.capacity", "335544320"); |
| serverConf.setString("rss.coordinator.quorum", COORDINATOR_QUORUM); |
| serverConf.setString("rss.server.heartbeat.delay", "1000"); |
| serverConf.setString("rss.server.heartbeat.interval", "1000"); |
| serverConf.setInteger("rss.jetty.http.port", 18080 + jettyPortCounter.getAndIncrement()); |
| serverConf.setInteger("rss.jetty.corePool.size", 64); |
| serverConf.setInteger("rss.rpc.executor.size", 10); |
| serverConf.setString("rss.server.hadoop.dfs.replication", "2"); |
| serverConf.setLong("rss.server.disk.capacity", 10L * 1024L * 1024L * 1024L); |
| serverConf.setBoolean("rss.server.health.check.enable", false); |
| serverConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true); |
| serverConf.set(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL, 500L); |
| serverConf.set(ShuffleServerConf.RPC_SERVER_TYPE, serverType); |
| if (serverType == ServerType.GRPC_NETTY) { |
| serverConf.setInteger( |
| ShuffleServerConf.NETTY_SERVER_PORT, NETTY_PORT + nettyPortCounter.getAndIncrement()); |
| } |
| return serverConf; |
| } |
| |
| protected static void createShuffleServer(ShuffleServerConf serverConf) throws Exception { |
| shuffleServers.add(new ShuffleServer(serverConf)); |
| } |
| |
| public static void startServers() throws Exception { |
| for (ShuffleServer shuffleServer : shuffleServers) { |
| shuffleServer.start(); |
| } |
| } |
| |
| private void registerAndRequireBuffer(String appId, int length, boolean isNettyMode) |
| throws Exception { |
| ShuffleServerGrpcClient shuffleServerClient = |
| isNettyMode |
| ? new ShuffleServerGrpcNettyClient( |
| LOCALHOST, |
| nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT), |
| nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)) |
| : new ShuffleServerGrpcClient( |
| LOCALHOST, grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)); |
| int shuffleId = 0; |
| int partitionId = 0; |
| List<PartitionRange> partitionIds = Lists.newArrayList(new PartitionRange(0, 3)); |
| |
| RssRegisterShuffleRequest registerShuffleRequest = |
| new RssRegisterShuffleRequest(appId, shuffleId, partitionIds, ""); |
| RssRegisterShuffleResponse registerResponse = |
| shuffleServerClient.registerShuffle(registerShuffleRequest); |
| assertSame(StatusCode.SUCCESS, registerResponse.getStatusCode()); |
| |
| List<ShuffleBlockInfo> blockInfos = |
| Lists.newArrayList( |
| new ShuffleBlockInfo( |
| shuffleId, |
| partitionId, |
| 0, |
| length, |
| 0, |
| new byte[length], |
| Lists.newArrayList(), |
| 0, |
| 100, |
| 0)); |
| |
| Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = Maps.newHashMap(); |
| partitionToBlocks.put(partitionId, blockInfos); |
| Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleToBlocks = Maps.newHashMap(); |
| shuffleToBlocks.put(shuffleId, partitionToBlocks); |
| |
| RssSendShuffleDataRequest sendShuffleDataRequest = |
| new RssSendShuffleDataRequest(appId, 1, 1000, shuffleToBlocks); |
| RssSendShuffleDataResponse response = |
| shuffleServerClient.sendShuffleData(sendShuffleDataRequest); |
| assertSame(StatusCode.SUCCESS, response.getStatusCode()); |
| shuffleServerClient.close(); |
| } |
| |
| private static Stream<Arguments> testTopNShuffleDataSizeOfAppCalcTaskProvider() { |
| return Stream.of(Arguments.of(true), Arguments.of(false)); |
| } |
| |
| @ParameterizedTest |
| @MethodSource("testTopNShuffleDataSizeOfAppCalcTaskProvider") |
| private void testTopNShuffleDataSizeOfAppCalcTask(boolean isNettyMode) throws Exception { |
| // Here is 6 app, but config max top n number is 5 |
| registerAndRequireBuffer("application_id_1", 1000, isNettyMode); |
| registerAndRequireBuffer("application_id_2", 2000, isNettyMode); |
| registerAndRequireBuffer("application_id_3", 3000, isNettyMode); |
| registerAndRequireBuffer("application_id_4", 4000, isNettyMode); |
| registerAndRequireBuffer("application_id_5", 5000, isNettyMode); |
| registerAndRequireBuffer("application_id_6", 6000, isNettyMode); |
| |
| Thread.sleep(500); |
| int jettyPort = |
| isNettyMode |
| ? nettyShuffleServerConfig.getInteger(ShuffleServerConf.JETTY_HTTP_PORT) |
| : grpcShuffleServerConfig.getInteger(ShuffleServerConf.JETTY_HTTP_PORT); |
| String content = |
| TestUtils.httpGet(String.format("http://127.0.0.1:%s/metrics/server", jettyPort)); |
| LOG.info(content); |
| ObjectMapper mapper = new ObjectMapper(); |
| JsonNode actualObj = mapper.readTree(content); |
| JsonNode metricsNode = actualObj.get("metrics"); |
| Set<String> topNTotalDataSizeApps = new HashSet<>(); |
| Set<String> topNInMemoryDataSizeApps = new HashSet<>(); |
| for (int i = 0; i < metricsNode.size(); i++) { |
| JsonNode metricsName = metricsNode.get(i).get("name"); |
| if (ShuffleServerMetrics.TOPN_OF_TOTAL_DATA_SIZE_FOR_APP.equals(metricsName.textValue())) { |
| Iterator<Map.Entry<String, JsonNode>> it = metricsNode.get(i).fields(); |
| while (it.hasNext()) { |
| Map.Entry<String, JsonNode> entry = it.next(); |
| if ("labelValues".equalsIgnoreCase(entry.getKey())) { |
| topNTotalDataSizeApps.add(entry.getValue().toString()); |
| } |
| } |
| } |
| if (ShuffleServerMetrics.TOPN_OF_IN_MEMORY_DATA_SIZE_FOR_APP.equals( |
| metricsName.textValue())) { |
| Iterator<Map.Entry<String, JsonNode>> it = metricsNode.get(i).fields(); |
| while (it.hasNext()) { |
| Map.Entry<String, JsonNode> entry = it.next(); |
| if ("labelValues".equalsIgnoreCase(entry.getKey())) { |
| topNInMemoryDataSizeApps.add(entry.getValue().toString()); |
| } |
| } |
| } |
| } |
| |
| Set<String> expectedTopNApps = |
| Sets.newHashSet( |
| "[\"application_id_6\"]", |
| "[\"application_id_5\"]", |
| "[\"application_id_4\"]", |
| "[\"application_id_3\"]", |
| "[\"application_id_2\"]"); |
| assertTrue( |
| expectedTopNApps.containsAll(topNTotalDataSizeApps) |
| && expectedTopNApps.size() == topNTotalDataSizeApps.size()); |
| assertTrue( |
| expectedTopNApps.containsAll(topNInMemoryDataSizeApps) |
| && expectedTopNApps.size() == topNInMemoryDataSizeApps.size()); |
| } |
| |
| @AfterEach |
| public void cleanMetrics() throws Exception { |
| ShuffleServerMetrics.clear(); |
| } |
| |
| @AfterAll |
| public static void shutdownServers() throws Exception { |
| for (ShuffleServer shuffleServer : shuffleServers) { |
| shuffleServer.stopServer(); |
| } |
| shuffleServers = Lists.newArrayList(); |
| ShuffleServerMetrics.clear(); |
| } |
| } |