blob: 75c49cd4dcf170fbacc7852d32687a84c29f9850 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.uniffle.server;
import java.io.File;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.RangeMap;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.exception.InvalidRequestException;
import org.apache.uniffle.common.exception.NoBufferForHugePartitionException;
import org.apache.uniffle.common.exception.NoRegisterException;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.BlockIdLayout;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
import org.apache.uniffle.server.buffer.ShuffleBuffer;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.server.storage.LocalStorageManager;
import org.apache.uniffle.server.storage.StorageManager;
import org.apache.uniffle.storage.HadoopTestBase;
import org.apache.uniffle.storage.common.LocalStorage;
import org.apache.uniffle.storage.handler.impl.HadoopClientReadHandler;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
import org.apache.uniffle.storage.util.StorageType;
import static org.apache.uniffle.server.ShuffleServerConf.CLIENT_MAX_CONCURRENCY_LIMITATION_OF_ONE_PARTITION;
import static org.apache.uniffle.server.ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class ShuffleTaskManagerTest extends HadoopTestBase {
private static final AtomicInteger ATOMIC_INT = new AtomicInteger(0);
private ShuffleServer shuffleServer;
@TempDir File tempDir1;
@TempDir File tempDir2;
@BeforeEach
public void beforeEach() {
ShuffleServerMetrics.clear();
ShuffleServerMetrics.register();
assertTrue(this.tempDir1.isDirectory());
assertTrue(this.tempDir2.isDirectory());
}
@AfterEach
public void afterEach() throws Exception {
if (shuffleServer != null) {
shuffleServer.stopServer();
shuffleServer = null;
}
ShuffleServerMetrics.clear();
}
private ShuffleServerConf constructServerConfWithLocalfile() {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
ShuffleServerConf conf = new ShuffleServerConf(confFile);
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 1000L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 100000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name());
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.setString(
ShuffleServerConf.RSS_STORAGE_BASE_PATH.key(),
tempDir1.getAbsolutePath() + "," + tempDir2.getAbsolutePath());
return conf;
}
/** Test the shuffleMeta's diskSize when app is removed. */
@Test
public void appPurgeWithLocalfileTest() throws Exception {
ShuffleServerConf conf = constructServerConfWithLocalfile();
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
String appId = "removeShuffleDataWithLocalfileTest";
int shuffleNum = 4;
for (int i = 0; i < shuffleNum; i++) {
shuffleTaskManager.registerShuffle(
appId,
i,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY);
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
shuffleTaskManager.requireBuffer(35);
shuffleTaskManager.cacheShuffleData(appId, i, false, partitionedData0);
shuffleTaskManager.updateCachedBlockIds(appId, i, partitionedData0.getBlockList());
}
assertEquals(1, shuffleTaskManager.getAppIds().size());
for (int i = 0; i < shuffleNum; i++) {
shuffleTaskManager.commitShuffle(appId, i);
}
shuffleTaskManager.removeResources(appId, false);
for (String path : conf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH)) {
String appPath = path + "/" + appId;
assertFalse(new File(appPath).exists());
}
// once the app is removed. the disk size should be 0
LocalStorageManager localStorageManager =
(LocalStorageManager) shuffleServer.getStorageManager();
for (LocalStorage localStorage : localStorageManager.getStorages()) {
assertEquals(0, localStorage.getMetaData().getDiskSize().get());
}
}
@Test
public void hugePartitionMemoryUsageLimitTest() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
ShuffleServerConf conf = new ShuffleServerConf(confFile);
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE.name());
conf.setString(ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD.key(), "1K");
conf.setString("rss.server.buffer.capacity", "10K");
conf.set(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO, 0.1);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
String appId = "hugePartitionMemoryUsageLimitTest_appId";
int shuffleId = 1;
// case1, expect NoRegisterException
try {
shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1), 500);
fail("Should thow NoRegisterException");
} catch (Exception e) {
assertTrue(e instanceof NoRegisterException);
}
shuffleTaskManager.registerShuffle(
appId,
shuffleId,
Lists.newArrayList(new PartitionRange(1, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY);
// case2
try {
long requiredId = shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1), 500);
assertNotEquals(-1, requiredId);
} catch (Exception e) {
fail("Should not throw Exception");
}
// case3
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 500);
shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData0);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0.getBlockList());
try {
long requiredId = shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1), 500);
assertNotEquals(-1, requiredId);
} catch (Exception e) {
fail("Should not throw Exception");
}
// case4
partitionedData0 = createPartitionedData(1, 1, 500);
shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData0);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0.getBlockList());
try {
shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1), 500);
fail("Should throw NoBufferForHugePartitionException");
} catch (Exception e) {
assertTrue(e instanceof NoBufferForHugePartitionException);
}
// metrics test
assertEquals(0, ShuffleServerMetrics.counterTotalRequireBufferFailedForHugePartition.get());
assertEquals(0, ShuffleServerMetrics.counterTotalRequireBufferFailedForRegularPartition.get());
assertEquals(1, ShuffleServerMetrics.counterTotalAppWithHugePartitionNum.get());
assertEquals(1, ShuffleServerMetrics.counterTotalHugePartitionNum.get());
assertEquals(1, ShuffleServerMetrics.gaugeHugePartitionNum.get());
assertEquals(1, ShuffleServerMetrics.gaugeAppWithHugePartitionNum.get());
// case5
shuffleTaskManager.removeResources(appId, false);
assertEquals(0, ShuffleServerMetrics.gaugeHugePartitionNum.get());
assertEquals(0, ShuffleServerMetrics.gaugeAppWithHugePartitionNum.get());
}
@Test
public void partitionDataSizeSummaryTest() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
ShuffleServerConf conf = new ShuffleServerConf(confFile);
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE.name());
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
String appId = "partitionDataSizeSummaryTest";
int shuffleId = 1;
shuffleTaskManager.registerShuffle(
appId,
shuffleId,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY);
// case1
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
long size1 = partitionedData0.getTotalBlockSize();
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0.getBlockList());
assertEquals(size1, shuffleTaskManager.getShuffleTaskInfo(appId).getTotalDataSize());
// case2
partitionedData0 = createPartitionedData(1, 1, 35);
long size2 = partitionedData0.getTotalBlockSize();
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0.getBlockList());
assertEquals(size1 + size2, shuffleTaskManager.getShuffleTaskInfo(appId).getTotalDataSize());
assertEquals(
size1 + size2, shuffleTaskManager.getShuffleTaskInfo(appId).getPartitionDataSize(1, 1));
}
@Test
public void registerShuffleTest() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
ShuffleServerConf conf = new ShuffleServerConf(confFile);
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.HDFS.name());
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
String appId = "registerTest1";
int shuffleId = 1;
shuffleTaskManager.registerShuffle(
appId,
shuffleId,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY);
shuffleTaskManager.registerShuffle(
appId,
shuffleId,
Lists.newArrayList(new PartitionRange(2, 3)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY);
Map<String, Map<Integer, RangeMap<Integer, ShuffleBuffer>>> bufferPool =
shuffleServer.getShuffleBufferManager().getBufferPool();
assertNotNull(bufferPool.get(appId).get(shuffleId).get(0));
ShuffleBuffer buffer = bufferPool.get(appId).get(shuffleId).get(0);
assertEquals(buffer, bufferPool.get(appId).get(shuffleId).get(1));
assertNotNull(bufferPool.get(appId).get(shuffleId).get(2));
assertEquals(
bufferPool.get(appId).get(shuffleId).get(2), bufferPool.get(appId).get(shuffleId).get(3));
// register again
shuffleTaskManager.registerShuffle(
appId,
shuffleId,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY);
assertEquals(buffer, bufferPool.get(appId).get(shuffleId).get(0));
}
@Test
public void writeProcessTest() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
ShuffleServerConf conf = new ShuffleServerConf(confFile);
final String remoteStorage = HDFS_URI + "rss/test";
final String appId = "testAppId";
final int shuffleId = 1;
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.HDFS.name());
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED, 3000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
shuffleTaskManager.registerShuffle(
appId,
shuffleId,
Lists.newArrayList(new PartitionRange(1, 1)),
new RemoteStorageInfo(remoteStorage),
StringUtils.EMPTY);
shuffleTaskManager.registerShuffle(
appId,
shuffleId,
Lists.newArrayList(new PartitionRange(2, 2)),
new RemoteStorageInfo(remoteStorage),
StringUtils.EMPTY);
final List<ShufflePartitionedBlock> expectedBlocks1 = Lists.newArrayList();
final List<ShufflePartitionedBlock> expectedBlocks2 = Lists.newArrayList();
final Map<Long, PreAllocatedBufferInfo> bufferIds = shuffleTaskManager.getRequireBufferIds();
shuffleTaskManager.requireBuffer(10);
shuffleTaskManager.requireBuffer(10);
shuffleTaskManager.requireBuffer(10);
assertEquals(3, bufferIds.size());
// required buffer should be clear if it doesn't receive data after timeout
Thread.sleep(6000);
assertEquals(0, bufferIds.size());
shuffleTaskManager.commitShuffle(appId, shuffleId);
// won't flush for partition 1-1
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
expectedBlocks1.addAll(Lists.newArrayList(partitionedData0.getBlockList()));
long bufferId = shuffleTaskManager.requireBuffer(35);
assertEquals(1, bufferIds.size());
PreAllocatedBufferInfo pabi = bufferIds.get(bufferId);
assertEquals(35, pabi.getRequireSize());
StatusCode sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData0);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData0.getBlockList());
// the required id won't be removed in shuffleTaskManager, it is removed in Grpc service
assertEquals(1, bufferIds.size());
assertEquals(StatusCode.SUCCESS, sc);
shuffleTaskManager.commitShuffle(appId, shuffleId);
// manually release the pre allocate buffer
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager();
assertEquals(
1, shuffleFlushManager.getCommittedBlockIds(appId, shuffleId).getLongCardinality());
// flush for partition 1-1
ShufflePartitionedData partitionedData1 = createPartitionedData(1, 2, 35);
expectedBlocks1.addAll(Lists.newArrayList(partitionedData1.getBlockList()));
bufferId = shuffleTaskManager.requireBuffer(70);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData1);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData1.getBlockList());
assertEquals(StatusCode.SUCCESS, sc);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
waitForFlush(shuffleFlushManager, appId, shuffleId, 2 + 1);
// won't flush for partition 1-1
ShufflePartitionedData partitionedData2 = createPartitionedData(1, 1, 30);
expectedBlocks1.addAll(Lists.newArrayList(partitionedData2.getBlockList()));
// receive un-preAllocation data
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, partitionedData2);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData2.getBlockList());
assertEquals(StatusCode.SUCCESS, sc);
// won't flush for partition 2-2
ShufflePartitionedData partitionedData3 = createPartitionedData(2, 1, 30);
expectedBlocks2.addAll(Lists.newArrayList(partitionedData3.getBlockList()));
bufferId = shuffleTaskManager.requireBuffer(30);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData3);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData3.getBlockList());
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
assertEquals(StatusCode.SUCCESS, sc);
// flush for partition 2-2
ShufflePartitionedData partitionedData4 = createPartitionedData(2, 1, 35);
expectedBlocks2.addAll(Lists.newArrayList(partitionedData4.getBlockList()));
bufferId = shuffleTaskManager.requireBuffer(35);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData4);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData4.getBlockList());
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
assertEquals(StatusCode.SUCCESS, sc);
shuffleTaskManager.commitShuffle(appId, shuffleId);
// 3 new blocks should be committed
waitForFlush(shuffleFlushManager, appId, shuffleId, 2 + 1 + 3);
// flush for partition 1-1
ShufflePartitionedData partitionedData5 = createPartitionedData(1, 2, 35);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData5.getBlockList());
expectedBlocks1.addAll(Lists.newArrayList(partitionedData5.getBlockList()));
bufferId = shuffleTaskManager.requireBuffer(70);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData5);
assertEquals(StatusCode.SUCCESS, sc);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
// 2 new blocks should be committed
waitForFlush(shuffleFlushManager, appId, shuffleId, 2 + 1 + 3 + 2);
shuffleTaskManager.commitShuffle(appId, shuffleId);
shuffleTaskManager.commitShuffle(appId, shuffleId);
validate(appId, shuffleId, 1, expectedBlocks1, remoteStorage);
validate(appId, shuffleId, 2, expectedBlocks2, remoteStorage);
// flush for partition 0-1
ShufflePartitionedData partitionedData7 = createPartitionedData(1, 2, 35);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData7.getBlockList());
bufferId = shuffleTaskManager.requireBuffer(70);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData7);
assertEquals(StatusCode.SUCCESS, sc);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
// 2 new blocks should be committed
waitForFlush(shuffleFlushManager, appId, shuffleId, 2 + 1 + 3 + 2 + 2);
shuffleFlushManager.removeResources(appId);
try {
shuffleTaskManager.commitShuffle(appId, shuffleId);
fail("Exception should be thrown");
} catch (Exception e) {
assertTrue(e.getMessage().startsWith("Shuffle data commit timeout for"));
}
}
/**
* Clean up the shuffle data of stage level for one app
*
* @throws Exception
*/
@Test
public void removeShuffleDataWithHdfsTest() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
ShuffleServerConf conf = new ShuffleServerConf(confFile);
String storageBasePath = HDFS_URI + "rss/removeShuffleDataWithHdfsTest";
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
String appId = "removeShuffleDataTest1";
for (int i = 0; i < 4; i++) {
shuffleTaskManager.registerShuffle(
appId,
i,
Lists.newArrayList(new PartitionRange(0, 1)),
new RemoteStorageInfo(storageBasePath, Maps.newHashMap()),
StringUtils.EMPTY);
}
shuffleTaskManager.refreshAppId(appId);
assertEquals(1, shuffleTaskManager.getAppIds().size());
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
shuffleTaskManager.requireBuffer(35);
shuffleTaskManager.requireBuffer(35);
shuffleTaskManager.cacheShuffleData(appId, 0, false, partitionedData0);
shuffleTaskManager.updateCachedBlockIds(appId, 0, partitionedData0.getBlockList());
shuffleTaskManager.cacheShuffleData(appId, 1, false, partitionedData0);
shuffleTaskManager.updateCachedBlockIds(appId, 1, partitionedData0.getBlockList());
shuffleTaskManager.refreshAppId(appId);
shuffleTaskManager.checkResourceStatus();
assertEquals(1, shuffleTaskManager.getAppIds().size());
ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager();
RangeMap<Integer, ShuffleBuffer> rangeMap =
shuffleBufferManager.getBufferPool().get(appId).get(0);
assertFalse(rangeMap.asMapOfRanges().isEmpty());
shuffleTaskManager.commitShuffle(appId, 0);
// Before removing shuffle resources
String appBasePath = ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePath, appId);
String shufflePath0 = ShuffleStorageUtils.getFullShuffleDataFolder(appBasePath, "0");
assertTrue(fs.exists(new Path(shufflePath0)));
// After removing the shuffle id of 0 resources
shuffleTaskManager.removeShuffleDataSync(appId, 0);
assertFalse(fs.exists(new Path(shufflePath0)));
assertTrue(fs.exists(new Path(appBasePath)));
assertNull(shuffleBufferManager.getBufferPool().get(appId).get(0));
assertNotNull(shuffleBufferManager.getBufferPool().get(appId).get(1));
// the shufflePurgeEvent only will delete the children folders
// Once the app is expired, all the app folders should be deleted.
shuffleTaskManager.removeResources(appId, false);
assertFalse(fs.exists(new Path(appBasePath)));
}
@Test
public void removeShuffleDataWithLocalfileTest() throws Exception {
ShuffleServerConf conf = constructServerConfWithLocalfile();
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
String appId = "removeShuffleDataWithLocalfileTest";
int shuffleNum = 4;
for (int i = 0; i < shuffleNum; i++) {
shuffleTaskManager.registerShuffle(
appId,
i,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY);
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
shuffleTaskManager.requireBuffer(35);
shuffleTaskManager.cacheShuffleData(appId, i, false, partitionedData0);
shuffleTaskManager.updateCachedBlockIds(appId, i, partitionedData0.getBlockList());
}
assertEquals(1, shuffleTaskManager.getAppIds().size());
for (int i = 0; i < shuffleNum; i++) {
shuffleTaskManager.commitShuffle(appId, i);
shuffleTaskManager.removeShuffleDataSync(appId, i);
}
for (String path : conf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH)) {
String appPath = path + "/" + appId;
File[] files = new File(appPath).listFiles();
if (files != null) {
assertEquals(0, files.length);
}
}
// the shufflePurgeEvent only will delete the children folders
// Once the app is expired, all the app folders should be deleted.
shuffleTaskManager.removeResources(appId, false);
for (String path : conf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH)) {
String appPath = path + "/" + appId;
assertFalse(new File(appPath).exists());
}
}
@Test
public void clearTest() throws Exception {
ShuffleServerConf conf = new ShuffleServerConf();
String storageBasePath = HDFS_URI + "rss/clearTest";
final int shuffleId = 1;
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storageBasePath));
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.HDFS.name());
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
shuffleTaskManager.registerShuffle(
"clearTest1",
shuffleId,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY);
shuffleTaskManager.registerShuffle(
"clearTest2",
shuffleId,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY);
shuffleTaskManager.refreshAppId("clearTest1");
shuffleTaskManager.refreshAppId("clearTest2");
assertEquals(2, shuffleTaskManager.getAppIds().size());
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
// keep refresh status of application "clearTest1"
int retry = 0;
while (retry < 10) {
Thread.sleep(1000);
shuffleTaskManager.cacheShuffleData("clearTest1", shuffleId, false, partitionedData0);
shuffleTaskManager.updateCachedBlockIds(
"clearTest1", shuffleId, partitionedData0.getBlockList());
shuffleTaskManager.refreshAppId("clearTest1");
shuffleTaskManager.checkResourceStatus();
retry++;
}
// application "clearTest2" was removed according to rss.server.app.expired.withoutHeartbeat
assertEquals(Sets.newHashSet("clearTest1"), shuffleTaskManager.getAppIds());
assertEquals(
1, shuffleTaskManager.getCachedBlockIds("clearTest1", shuffleId).getLongCardinality());
// register again
shuffleTaskManager.registerShuffle(
"clearTest2",
shuffleId,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY);
shuffleTaskManager.refreshAppId("clearTest2");
shuffleTaskManager.checkResourceStatus();
assertEquals(Sets.newHashSet("clearTest1", "clearTest2"), shuffleTaskManager.getAppIds());
Thread.sleep(5000);
shuffleTaskManager.checkResourceStatus();
// wait resource delete
Thread.sleep(3000);
assertEquals(Collections.EMPTY_SET, shuffleTaskManager.getAppIds());
assertTrue(shuffleTaskManager.getCachedBlockIds("clearTest1", shuffleId).isEmpty());
}
@Test
public void clearMultiTimesTest() throws Exception {
ShuffleServerConf conf = new ShuffleServerConf();
String storageBasePath = HDFS_URI + "rss/clearMultiTimesTest";
final int shuffleId = 1;
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storageBasePath));
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.HDFS.name());
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
String appId = "clearMultiTimesTest";
shuffleTaskManager.registerShuffle(
appId,
shuffleId,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY);
shuffleTaskManager.refreshAppId(appId);
assertEquals(1, shuffleTaskManager.getAppIds().size());
shuffleTaskManager.checkResourceStatus();
assertEquals(Sets.newHashSet(appId), shuffleTaskManager.getAppIds());
CountDownLatch countDownLatch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
new Thread(
() -> {
try {
shuffleTaskManager.removeResources(appId, false);
} finally {
countDownLatch.countDown();
}
})
.start();
}
countDownLatch.await();
assertEquals(Collections.EMPTY_SET, shuffleTaskManager.getAppIds());
assertTrue(shuffleTaskManager.getCachedBlockIds(appId, shuffleId).isEmpty());
}
@Test
public void removeResourcesByShuffleIdsMultiTimesTest() throws Exception {
ShuffleServerConf conf = new ShuffleServerConf();
String storageBasePath = HDFS_URI + "rss/removeResourcesByShuffleIdsMultiTimesTest";
final int shuffleId = 1;
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storageBasePath));
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.HDFS.name());
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
String appId = "removeResourcesByShuffleIdsMultiTimesTest";
shuffleTaskManager.registerShuffle(
appId,
shuffleId,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY);
shuffleTaskManager.refreshAppId(appId);
assertEquals(1, shuffleTaskManager.getAppIds().size());
shuffleTaskManager.checkResourceStatus();
assertEquals(Sets.newHashSet(appId), shuffleTaskManager.getAppIds());
assertEquals(1, (int) ShuffleServerMetrics.gaugeTotalPartitionNum.get());
CountDownLatch countDownLatch = new CountDownLatch(3);
List<Integer> shuffleIds = Lists.newArrayList(shuffleId);
for (int i = 0; i < 3; i++) {
new Thread(
() -> {
try {
shuffleTaskManager.removeResourcesByShuffleIds(appId, shuffleIds);
} finally {
countDownLatch.countDown();
}
})
.start();
}
countDownLatch.await();
assertEquals(0, (int) ShuffleServerMetrics.gaugeTotalPartitionNum.get());
}
@Test
public void getBlockIdsByPartitionIdTest() {
BlockIdLayout layout = BlockIdLayout.DEFAULT;
ShuffleServerConf conf = new ShuffleServerConf();
ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(conf, null, null, null);
Roaring64NavigableMap expectedBlockIds = Roaring64NavigableMap.bitmapOf();
int expectedPartitionId = 5;
Roaring64NavigableMap bitmapBlockIds = Roaring64NavigableMap.bitmapOf();
for (int taskId = 1; taskId < 10; taskId++) {
for (int partitionId = 1; partitionId < 10; partitionId++) {
for (int i = 0; i < 2; i++) {
long blockId = layout.getBlockId(i, partitionId, taskId);
bitmapBlockIds.addLong(blockId);
if (partitionId == expectedPartitionId) {
expectedBlockIds.addLong(blockId);
}
}
}
}
Roaring64NavigableMap resultBlockIds =
shuffleTaskManager.getBlockIdsByPartitionId(
Sets.newHashSet(expectedPartitionId),
bitmapBlockIds,
Roaring64NavigableMap.bitmapOf(),
layout);
assertEquals(expectedBlockIds, resultBlockIds);
bitmapBlockIds.addLong(layout.getBlockId(0, 0, 0));
resultBlockIds =
shuffleTaskManager.getBlockIdsByPartitionId(
Sets.newHashSet(0), bitmapBlockIds, Roaring64NavigableMap.bitmapOf(), layout);
assertEquals(Roaring64NavigableMap.bitmapOf(0L), resultBlockIds);
long expectedBlockId =
layout.getBlockId(layout.maxSequenceNo, layout.maxPartitionId, layout.maxTaskAttemptId);
bitmapBlockIds.addLong(expectedBlockId);
resultBlockIds =
shuffleTaskManager.getBlockIdsByPartitionId(
Sets.newHashSet(layout.maxPartitionId),
bitmapBlockIds,
Roaring64NavigableMap.bitmapOf(),
layout);
assertEquals(Roaring64NavigableMap.bitmapOf(expectedBlockId), resultBlockIds);
}
@Test
public void getBlockIdsByMultiPartitionTest() {
BlockIdLayout layout = BlockIdLayout.DEFAULT;
ShuffleServerConf conf = new ShuffleServerConf();
ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(conf, null, null, null);
Roaring64NavigableMap expectedBlockIds = Roaring64NavigableMap.bitmapOf();
int startPartition = 3;
int endPartition = 5;
Roaring64NavigableMap bitmapBlockIds = Roaring64NavigableMap.bitmapOf();
for (int taskId = 1; taskId < 10; taskId++) {
for (int partitionId = 1; partitionId < 10; partitionId++) {
for (int i = 0; i < 2; i++) {
long blockId = layout.getBlockId(i, partitionId, taskId);
bitmapBlockIds.addLong(blockId);
if (partitionId >= startPartition && partitionId <= endPartition) {
expectedBlockIds.addLong(blockId);
}
}
}
}
Set<Integer> requestPartitions = Sets.newHashSet();
Set<Integer> allPartitions = Sets.newHashSet();
for (int partitionId = 1; partitionId < 10; partitionId++) {
allPartitions.add(partitionId);
if (partitionId >= startPartition && partitionId <= endPartition) {
requestPartitions.add(partitionId);
}
}
Roaring64NavigableMap resultBlockIds =
shuffleTaskManager.getBlockIdsByPartitionId(
requestPartitions, bitmapBlockIds, Roaring64NavigableMap.bitmapOf(), layout);
assertEquals(expectedBlockIds, resultBlockIds);
assertEquals(
bitmapBlockIds,
shuffleTaskManager.getBlockIdsByPartitionId(
allPartitions, bitmapBlockIds, Roaring64NavigableMap.bitmapOf(), layout));
}
@Test
public void testGetFinishedBlockIds() throws Exception {
ShuffleServerConf conf = new ShuffleServerConf();
String storageBasePath = HDFS_URI + "rss/test";
String appId = "test_app";
final int shuffleId = 1;
final int bitNum = 3;
final int partitionNum = 10;
final int taskNum = 10;
final int blocksPerTask = 2;
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storageBasePath));
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.HDFS.name());
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
shuffleServer = new ShuffleServer(conf);
ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager();
ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager();
StorageManager storageManager = shuffleServer.getStorageManager();
ShuffleTaskManager shuffleTaskManager =
new ShuffleTaskManager(conf, shuffleFlushManager, shuffleBufferManager, storageManager);
int startPartition = 6;
int endPartition = 9;
BlockIdLayout layout = BlockIdLayout.DEFAULT;
Roaring64NavigableMap expectedBlockIds = Roaring64NavigableMap.bitmapOf();
Map<Integer, long[]> blockIdsToReport = Maps.newHashMap();
for (int partitionId = 0; partitionId < partitionNum; partitionId++) {
shuffleTaskManager.registerShuffle(
appId,
shuffleId,
Lists.newArrayList(new PartitionRange(partitionId, partitionId)),
new RemoteStorageInfo(storageBasePath),
StringUtils.EMPTY);
long[] blockIds = new long[taskNum * blocksPerTask];
for (int taskId = 0; taskId < taskNum; taskId++) {
for (int i = 0; i < blocksPerTask; i++) {
long blockId = layout.getBlockId(i, partitionId, taskId);
blockIds[taskId * blocksPerTask + i] = blockId;
}
}
blockIdsToReport.putIfAbsent(partitionId, blockIds);
if (partitionId >= startPartition) {
expectedBlockIds.add(blockIds);
}
}
assertEquals(
(endPartition - startPartition + 1) * taskNum * blocksPerTask,
expectedBlockIds.getLongCardinality());
shuffleTaskManager.addFinishedBlockIds(appId, shuffleId, blockIdsToReport, bitNum);
Set<Integer> requestPartitions = Sets.newHashSet();
for (int partitionId = startPartition; partitionId <= endPartition; partitionId++) {
requestPartitions.add(partitionId);
}
byte[] serializeBitMap =
shuffleTaskManager.getFinishedBlockIds(appId, shuffleId, requestPartitions, layout);
Roaring64NavigableMap resBlockIds = RssUtils.deserializeBitMap(serializeBitMap);
assertEquals(expectedBlockIds, resBlockIds);
try {
// calling with same appId and shuffleId but different bitmapNum should fail
shuffleTaskManager.addFinishedBlockIds(appId, shuffleId, blockIdsToReport, bitNum - 1);
fail("Exception should be thrown");
} catch (InvalidRequestException e) {
assertEquals(e.getMessage(), "Request expects 2 bitmaps, but there are 3 bitmaps!");
}
}
@Test
public void testAddFinishedBlockIdsWithoutRegister() throws Exception {
ShuffleServerConf conf = new ShuffleServerConf();
String storageBasePath = HDFS_URI + "rss/test";
String appId = "testAddFinishedBlockIdsToExpiredApp";
final int shuffleId = 1;
final int bitNum = 3;
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storageBasePath));
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.HDFS.name());
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
shuffleServer = new ShuffleServer(conf);
ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager();
ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager();
StorageManager storageManager = shuffleServer.getStorageManager();
ShuffleTaskManager shuffleTaskManager =
new ShuffleTaskManager(conf, shuffleFlushManager, shuffleBufferManager, storageManager);
Map<Integer, long[]> blockIdsToReport = Maps.newHashMap();
try {
shuffleTaskManager.addFinishedBlockIds(appId, shuffleId, blockIdsToReport, bitNum);
fail("Exception should be thrown");
} catch (RuntimeException e) {
assertTrue(e.getMessage().equals("appId[" + appId + "] is expired!"));
}
}
@Test
public void checkAndClearLeakShuffleDataTest(@TempDir File tempDir) throws Exception {
final String appId = "clearLocalTest_appId";
ShuffleServerConf conf = new ShuffleServerConf();
final int shuffleId = 1;
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 64L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(tempDir.getAbsolutePath()));
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name());
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
// make sure not to check leak shuffle data automatically
conf.setLong(ShuffleServerConf.SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL, 600 * 1000L);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
shuffleTaskManager.registerShuffle(
appId,
shuffleId,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY);
shuffleTaskManager.refreshAppId(appId);
assertEquals(1, shuffleTaskManager.getAppIds().size());
ShufflePartitionedData shuffleData = createPartitionedData(1, 1, 48);
// make sure shuffle data flush to disk
int retry = 0;
while (retry < 5) {
Thread.sleep(1000);
shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, shuffleData);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, shuffleData.getBlockList());
shuffleTaskManager.refreshAppId(appId);
shuffleTaskManager.checkResourceStatus();
retry++;
}
StorageManager storageManager = shuffleServer.getStorageManager();
assertTrue(storageManager instanceof LocalStorageManager);
LocalStorageManager localStorageManager = (LocalStorageManager) storageManager;
// parse appIds from storage
Set<String> appIdsOnDisk = getAppIdsOnDisk(localStorageManager);
assertEquals(appIdsOnDisk.size(), shuffleTaskManager.getAppIds().size());
assertTrue(appIdsOnDisk.contains(appId));
// make sure heartbeat timeout and resources are removed
Awaitility.await()
.timeout(10, TimeUnit.SECONDS)
.until(() -> shuffleTaskManager.getAppIds().size() == 0);
// Create the hidden dir to simulate LocalStorageChecker's check
String storageDir = tempDir.getAbsolutePath();
File hiddenFile = new File(storageDir + "/" + LocalStorageChecker.CHECKER_DIR_NAME);
hiddenFile.mkdir();
Awaitility.await()
.timeout(10, TimeUnit.SECONDS)
.until(() -> !getAppIdsOnDisk(localStorageManager).contains(appId));
assertFalse(appIdsOnDisk.contains(LocalStorageChecker.CHECKER_DIR_NAME));
// mock leak shuffle data
File file = new File(tempDir, appId);
assertFalse(file.exists());
file.mkdir();
assertTrue(file.exists());
// execute checkLeakShuffleData
shuffleTaskManager.checkLeakShuffleData();
assertFalse(file.exists());
assertTrue(hiddenFile.exists());
}
private Set<String> getAppIdsOnDisk(LocalStorageManager localStorageManager) {
Set<String> appIdsOnDisk = new HashSet<>();
List<LocalStorage> storages = localStorageManager.getStorages();
for (LocalStorage storage : storages) {
appIdsOnDisk.addAll(storage.getAppIds());
}
return appIdsOnDisk;
}
private void waitForFlush(
ShuffleFlushManager shuffleFlushManager, String appId, int shuffleId, int expectedBlockNum)
throws Exception {
int retry = 0;
while (true) {
// remove flushed eventId to test timeout in commit
if (shuffleFlushManager.getCommittedBlockIds(appId, shuffleId).getIntCardinality()
== expectedBlockNum) {
break;
}
Thread.sleep(1000);
retry++;
if (retry > 5) {
fail("Timeout to flush data");
}
}
}
private ShufflePartitionedData createPartitionedData(
int partitionId, int blockNum, int dataLength) {
ShufflePartitionedBlock[] blocks = createBlock(blockNum, dataLength);
return new ShufflePartitionedData(partitionId, blocks);
}
private ShufflePartitionedBlock[] createBlock(int num, int length) {
ShufflePartitionedBlock[] blocks = new ShufflePartitionedBlock[num];
for (int i = 0; i < num; i++) {
byte[] buf = new byte[length];
new Random().nextBytes(buf);
blocks[i] =
new ShufflePartitionedBlock(
length, length, ChecksumUtils.getCrc32(buf), ATOMIC_INT.incrementAndGet(), 0, buf);
}
return blocks;
}
private void validate(
String appId,
int shuffleId,
int partitionId,
List<ShufflePartitionedBlock> blocks,
String basePath) {
Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
Set<Long> remainIds = Sets.newHashSet();
for (ShufflePartitionedBlock spb : blocks) {
expectBlockIds.addLong(spb.getBlockId());
remainIds.add(spb.getBlockId());
}
HadoopClientReadHandler handler =
new HadoopClientReadHandler(
appId,
shuffleId,
partitionId,
100,
1,
10,
1000,
expectBlockIds,
processBlockIds,
basePath,
new Configuration());
ShuffleDataResult sdr = handler.readShuffleData();
List<BufferSegment> bufferSegments = sdr.getBufferSegments();
int matchNum = 0;
for (ShufflePartitionedBlock block : blocks) {
for (BufferSegment bs : bufferSegments) {
if (bs.getBlockId() == block.getBlockId()) {
assertEquals(block.getLength(), bs.getLength());
assertEquals(block.getCrc(), bs.getCrc());
matchNum++;
break;
}
}
}
assertEquals(blocks.size(), matchNum);
}
@Test
public void testGetMaxConcurrencyWriting() {
ShuffleServerConf conf = new ShuffleServerConf();
conf.set(SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION, 10);
conf.set(CLIENT_MAX_CONCURRENCY_LIMITATION_OF_ONE_PARTITION, 30);
// case1: client max concurrency is <= 0
assertEquals(10, ShuffleTaskManager.getMaxConcurrencyWriting(-1, conf));
// case2: client max concurrency is 24
assertEquals(24, ShuffleTaskManager.getMaxConcurrencyWriting(24, conf));
// case3: client max concurrency exceed 30
assertEquals(30, ShuffleTaskManager.getMaxConcurrencyWriting(40, conf));
}
@Test
public void testRegisterShuffleAfterAppIsExpired() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
ShuffleServerConf conf = new ShuffleServerConf(confFile);
final String storageBasePath = HDFS_URI + "rss/testRegisterShuffleAfterAppIsExpired";
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
String appId = "appId1";
shuffleTaskManager.registerShuffle(
appId,
1,
Lists.newArrayList(new PartitionRange(0, 1)),
new RemoteStorageInfo(storageBasePath, Maps.newHashMap()),
StringUtils.EMPTY);
shuffleTaskManager.refreshAppId(appId);
assertEquals(1, shuffleTaskManager.getAppIds().size());
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
shuffleTaskManager.requireBuffer(35);
shuffleTaskManager.cacheShuffleData(appId, 0, false, partitionedData0);
shuffleTaskManager.updateCachedBlockIds(appId, 0, partitionedData0.getBlockList());
shuffleTaskManager.refreshAppId(appId);
shuffleTaskManager.checkResourceStatus();
assertEquals(1, shuffleTaskManager.getAppIds().size());
// App is expired due to no heartbeat, so it was added to expired queue and will be removed
// resource soon.
Thread thread =
new Thread(
() -> {
try {
Thread.sleep(1000);
shuffleTaskManager.removeResources(appId, true);
} catch (InterruptedException e) {
throw new RssException(e);
}
});
thread.start();
// At this moment, this app re-registers shuffle.
shuffleTaskManager.registerShuffle(
appId,
2,
Lists.newArrayList(new PartitionRange(0, 1)),
new RemoteStorageInfo(storageBasePath, Maps.newHashMap()),
StringUtils.EMPTY);
Thread.sleep(2000);
// The NO_REGISTER status code should not appear.
assertTrue(shuffleTaskManager.requireBuffer(appId, 2, Arrays.asList(1), 35) != -4);
shuffleTaskManager.removeResources(appId, false);
}
}