blob: 1d799fb58180fb2ab20f78686a2052c2fb6c5255 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.uniffle.server;
import java.io.File;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.RangeMap;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
import org.apache.uniffle.server.buffer.ShuffleBuffer;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.server.storage.LocalStorageManager;
import org.apache.uniffle.server.storage.StorageManager;
import org.apache.uniffle.storage.HdfsTestBase;
import org.apache.uniffle.storage.common.LocalStorage;
import org.apache.uniffle.storage.handler.impl.HdfsClientReadHandler;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class ShuffleTaskManagerTest extends HdfsTestBase {
private static final AtomicInteger ATOMIC_INT = new AtomicInteger(0);
private ShuffleServer shuffleServer;
@BeforeEach
public void beforeEach() {
ShuffleServerMetrics.clear();
ShuffleServerMetrics.register();
}
@AfterEach
public void afterEach() throws Exception {
if (shuffleServer != null) {
shuffleServer.stopServer();
shuffleServer = null;
}
}
@Test
public void hugePartitionMemoryUsageLimitTest() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
ShuffleServerConf conf = new ShuffleServerConf(confFile);
conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE.name());
conf.setString(ShuffleServerConf.HUGE_PARTITION_SIZE_THRESHOLD.key(), "1K");
conf.setString("rss.server.buffer.capacity", "10K");
conf.set(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO, 0.1);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
String appId = "hugePartitionMemoryUsageLimitTest_appId";
int shuffleId = 1;
shuffleTaskManager.registerShuffle(
appId,
shuffleId,
Lists.newArrayList(new PartitionRange(1, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY
);
// case1
long requiredId = shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1), 500);
assertNotEquals(-1, requiredId);
// case2
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 500);
shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData0);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0.getBlockList());
requiredId = shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1), 500);
assertNotEquals(-1, requiredId);
// case3
partitionedData0 = createPartitionedData(1, 1, 500);
shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData0);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0.getBlockList());
requiredId = shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1), 500);
assertEquals(-1, requiredId);
// metrics test
assertEquals(1, ShuffleServerMetrics.counterTotalRequireBufferFailedForHugePartition.get());
assertEquals(0, ShuffleServerMetrics.counterTotalRequireBufferFailedForRegularPartition.get());
assertEquals(1, ShuffleServerMetrics.counterTotalAppWithHugePartitionNum.get());
assertEquals(1, ShuffleServerMetrics.counterTotalHugePartitionNum.get());
assertEquals(1, ShuffleServerMetrics.gaugeHugePartitionNum.get());
assertEquals(1, ShuffleServerMetrics.gaugeAppWithHugePartitionNum.get());
// case4
shuffleTaskManager.removeResources(appId);
assertEquals(0, ShuffleServerMetrics.gaugeHugePartitionNum.get());
assertEquals(0, ShuffleServerMetrics.gaugeAppWithHugePartitionNum.get());
}
@Test
public void partitionDataSizeSummaryTest() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
ShuffleServerConf conf = new ShuffleServerConf(confFile);
conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE.name());
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
String appId = "partitionDataSizeSummaryTest";
int shuffleId = 1;
shuffleTaskManager.registerShuffle(
appId,
shuffleId,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY
);
// case1
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
long size1 = partitionedData0.getTotalBlockSize();
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0.getBlockList());
assertEquals(
size1,
shuffleTaskManager.getShuffleTaskInfo(appId).getTotalDataSize()
);
// case2
partitionedData0 = createPartitionedData(1, 1, 35);
long size2 = partitionedData0.getTotalBlockSize();
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0.getBlockList());
assertEquals(
size1 + size2,
shuffleTaskManager.getShuffleTaskInfo(appId).getTotalDataSize()
);
assertEquals(
size1 + size2,
shuffleTaskManager.getShuffleTaskInfo(appId).getPartitionDataSize(1, 1)
);
}
@Test
public void registerShuffleTest() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
ShuffleServerConf conf = new ShuffleServerConf(confFile);
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
String appId = "registerTest1";
int shuffleId = 1;
shuffleTaskManager.registerShuffle(
appId,
shuffleId,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY
);
shuffleTaskManager.registerShuffle(
appId,
shuffleId,
Lists.newArrayList(new PartitionRange(2, 3)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY
);
Map<String, Map<Integer, RangeMap<Integer, ShuffleBuffer>>> bufferPool =
shuffleServer.getShuffleBufferManager().getBufferPool();
assertNotNull(bufferPool.get(appId).get(shuffleId).get(0));
ShuffleBuffer buffer = bufferPool.get(appId).get(shuffleId).get(0);
assertEquals(buffer, bufferPool.get(appId).get(shuffleId).get(1));
assertNotNull(bufferPool.get(appId).get(shuffleId).get(2));
assertEquals(bufferPool.get(appId).get(shuffleId).get(2), bufferPool.get(appId).get(shuffleId).get(3));
// register again
shuffleTaskManager.registerShuffle(
appId,
shuffleId,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY
);
assertEquals(buffer, bufferPool.get(appId).get(shuffleId).get(0));
}
@Test
public void writeProcessTest() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
ShuffleServerConf conf = new ShuffleServerConf(confFile);
final String remoteStorage = HDFS_URI + "rss/test";
final String appId = "testAppId";
final int shuffleId = 1;
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED, 3000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
shuffleTaskManager.registerShuffle(
appId,
shuffleId,
Lists.newArrayList(new PartitionRange(1, 1)),
new RemoteStorageInfo(remoteStorage),
StringUtils.EMPTY
);
shuffleTaskManager.registerShuffle(
appId,
shuffleId,
Lists.newArrayList(new PartitionRange(2, 2)),
new RemoteStorageInfo(remoteStorage),
StringUtils.EMPTY
);
final List<ShufflePartitionedBlock> expectedBlocks1 = Lists.newArrayList();
final List<ShufflePartitionedBlock> expectedBlocks2 = Lists.newArrayList();
final Map<Long, PreAllocatedBufferInfo> bufferIds = shuffleTaskManager.getRequireBufferIds();
shuffleTaskManager.requireBuffer(10);
shuffleTaskManager.requireBuffer(10);
shuffleTaskManager.requireBuffer(10);
assertEquals(3, bufferIds.size());
// required buffer should be clear if it doesn't receive data after timeout
Thread.sleep(6000);
assertEquals(0, bufferIds.size());
shuffleTaskManager.commitShuffle(appId, shuffleId);
// won't flush for partition 1-1
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
expectedBlocks1.addAll(Lists.newArrayList(partitionedData0.getBlockList()));
long bufferId = shuffleTaskManager.requireBuffer(35);
assertEquals(1, bufferIds.size());
PreAllocatedBufferInfo pabi = bufferIds.get(bufferId);
assertEquals(35, pabi.getRequireSize());
StatusCode sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData0);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData0.getBlockList());
// the required id won't be removed in shuffleTaskManager, it is removed in Grpc service
assertEquals(1, bufferIds.size());
assertEquals(StatusCode.SUCCESS, sc);
shuffleTaskManager.commitShuffle(appId, shuffleId);
// manually release the pre allocate buffer
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager();
assertEquals(1, shuffleFlushManager.getCommittedBlockIds(appId, shuffleId).getLongCardinality());
// flush for partition 1-1
ShufflePartitionedData partitionedData1 = createPartitionedData(1, 2, 35);
expectedBlocks1.addAll(Lists.newArrayList(partitionedData1.getBlockList()));
bufferId = shuffleTaskManager.requireBuffer(70);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData1);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData1.getBlockList());
assertEquals(StatusCode.SUCCESS, sc);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
waitForFlush(shuffleFlushManager, appId, shuffleId, 2 + 1);
// won't flush for partition 1-1
ShufflePartitionedData partitionedData2 = createPartitionedData(1, 1, 30);
expectedBlocks1.addAll(Lists.newArrayList(partitionedData2.getBlockList()));
// receive un-preAllocation data
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, partitionedData2);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData2.getBlockList());
assertEquals(StatusCode.SUCCESS, sc);
// won't flush for partition 2-2
ShufflePartitionedData partitionedData3 = createPartitionedData(2, 1, 30);
expectedBlocks2.addAll(Lists.newArrayList(partitionedData3.getBlockList()));
bufferId = shuffleTaskManager.requireBuffer(30);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData3);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData3.getBlockList());
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
assertEquals(StatusCode.SUCCESS, sc);
// flush for partition 2-2
ShufflePartitionedData partitionedData4 = createPartitionedData(2, 1, 35);
expectedBlocks2.addAll(Lists.newArrayList(partitionedData4.getBlockList()));
bufferId = shuffleTaskManager.requireBuffer(35);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData4);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData4.getBlockList());
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
assertEquals(StatusCode.SUCCESS, sc);
shuffleTaskManager.commitShuffle(appId, shuffleId);
// 3 new blocks should be committed
waitForFlush(shuffleFlushManager, appId, shuffleId, 2 + 1 + 3);
// flush for partition 1-1
ShufflePartitionedData partitionedData5 = createPartitionedData(1, 2, 35);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData5.getBlockList());
expectedBlocks1.addAll(Lists.newArrayList(partitionedData5.getBlockList()));
bufferId = shuffleTaskManager.requireBuffer(70);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData5);
assertEquals(StatusCode.SUCCESS, sc);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
// 2 new blocks should be committed
waitForFlush(shuffleFlushManager, appId, shuffleId, 2 + 1 + 3 + 2);
shuffleTaskManager.commitShuffle(appId, shuffleId);
shuffleTaskManager.commitShuffle(appId, shuffleId);
validate(appId, shuffleId, 1, expectedBlocks1, remoteStorage);
validate(appId, shuffleId, 2, expectedBlocks2, remoteStorage);
// flush for partition 0-1
ShufflePartitionedData partitionedData7 = createPartitionedData(1, 2, 35);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, partitionedData7.getBlockList());
bufferId = shuffleTaskManager.requireBuffer(70);
sc = shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, partitionedData7);
assertEquals(StatusCode.SUCCESS, sc);
shuffleTaskManager.removeAndReleasePreAllocatedBuffer(bufferId);
// 2 new blocks should be committed
waitForFlush(shuffleFlushManager, appId, shuffleId, 2 + 1 + 3 + 2 + 2);
shuffleFlushManager.removeResources(appId);
try {
shuffleTaskManager.commitShuffle(appId, shuffleId);
fail("Exception should be thrown");
} catch (Exception e) {
assertTrue(e.getMessage().startsWith("Shuffle data commit timeout for"));
}
}
/**
* Clean up the shuffle data of stage level for one app
* @throws Exception
*/
@Test
public void removeShuffleDataWithHdfsTest() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
ShuffleServerConf conf = new ShuffleServerConf(confFile);
String storageBasePath = HDFS_URI + "rss/removeShuffleDataWithHdfsTest";
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
String appId = "removeShuffleDataTest1";
for (int i = 0; i < 4; i++) {
shuffleTaskManager.registerShuffle(
appId,
i,
Lists.newArrayList(new PartitionRange(0, 1)),
new RemoteStorageInfo(storageBasePath, Maps.newHashMap()),
StringUtils.EMPTY
);
}
shuffleTaskManager.refreshAppId(appId);
assertEquals(1, shuffleTaskManager.getAppIds().size());
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
shuffleTaskManager.requireBuffer(35);
shuffleTaskManager.requireBuffer(35);
shuffleTaskManager.cacheShuffleData(appId, 0, false, partitionedData0);
shuffleTaskManager.updateCachedBlockIds(appId, 0, partitionedData0.getBlockList());
shuffleTaskManager.cacheShuffleData(appId, 1, false, partitionedData0);
shuffleTaskManager.updateCachedBlockIds(appId, 1, partitionedData0.getBlockList());
shuffleTaskManager.refreshAppId(appId);
shuffleTaskManager.checkResourceStatus();
assertEquals(1, shuffleTaskManager.getAppIds().size());
ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager();
RangeMap<Integer, ShuffleBuffer> rangeMap = shuffleBufferManager.getBufferPool().get(appId).get(0);
assertFalse(rangeMap.asMapOfRanges().isEmpty());
shuffleTaskManager.commitShuffle(appId, 0);
// Before removing shuffle resources
String appBasePath = ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePath, appId);
String shufflePath0 = ShuffleStorageUtils.getFullShuffleDataFolder(appBasePath, "0");
assertTrue(fs.exists(new Path(shufflePath0)));
// After removing the shuffle id of 0 resources
shuffleTaskManager.removeShuffleDataSync(appId, 0);
assertFalse(fs.exists(new Path(shufflePath0)));
assertTrue(fs.exists(new Path(appBasePath)));
assertNull(shuffleBufferManager.getBufferPool().get(appId).get(0));
assertNotNull(shuffleBufferManager.getBufferPool().get(appId).get(1));
shuffleTaskManager.removeResources(appId);
}
@Test
public void removeShuffleDataWithLocalfileTest() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
ShuffleServerConf conf = new ShuffleServerConf(confFile);
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 1000L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 100000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, "LOCALFILE");
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
java.nio.file.Path path1 = Files.createTempDirectory("removeShuffleDataWithLocalfileTest");
java.nio.file.Path path2 = Files.createTempDirectory("removeShuffleDataWithLocalfileTest");
conf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH.key(),
path1.toAbsolutePath().toString() + "," + path2.toAbsolutePath().toString());
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
String appId = "removeShuffleDataWithLocalfileTest";
int shuffleNum = 4;
for (int i = 0; i < shuffleNum; i++) {
shuffleTaskManager.registerShuffle(
appId,
i,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY
);
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
shuffleTaskManager.requireBuffer(35);
shuffleTaskManager.cacheShuffleData(appId, i, false, partitionedData0);
shuffleTaskManager.updateCachedBlockIds(appId, i, partitionedData0.getBlockList());
}
assertEquals(1, shuffleTaskManager.getAppIds().size());
for (int i = 0; i < shuffleNum; i++) {
shuffleTaskManager.commitShuffle(appId, i);
shuffleTaskManager.removeShuffleDataSync(appId, i);
}
for (String path : conf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH)) {
String appPath = path + "/" + appId;
File[] files = new File(appPath).listFiles();
if (files != null) {
assertEquals(0, files.length);
}
}
}
@Test
public void clearTest() throws Exception {
ShuffleServerConf conf = new ShuffleServerConf();
String storageBasePath = HDFS_URI + "rss/clearTest";
final int shuffleId = 1;
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storageBasePath));
conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
shuffleTaskManager.registerShuffle(
"clearTest1",
shuffleId,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY
);
shuffleTaskManager.registerShuffle(
"clearTest2",
shuffleId,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY
);
shuffleTaskManager.refreshAppId("clearTest1");
shuffleTaskManager.refreshAppId("clearTest2");
assertEquals(2, shuffleTaskManager.getAppIds().size());
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
// keep refresh status of application "clearTest1"
int retry = 0;
while (retry < 10) {
Thread.sleep(1000);
shuffleTaskManager.cacheShuffleData("clearTest1", shuffleId, false, partitionedData0);
shuffleTaskManager.updateCachedBlockIds("clearTest1", shuffleId, partitionedData0.getBlockList());
shuffleTaskManager.refreshAppId("clearTest1");
shuffleTaskManager.checkResourceStatus();
retry++;
}
// application "clearTest2" was removed according to rss.server.app.expired.withoutHeartbeat
assertEquals(Sets.newHashSet("clearTest1"), shuffleTaskManager.getAppIds());
assertEquals(1, shuffleTaskManager.getCachedBlockIds("clearTest1", shuffleId).getLongCardinality());
// register again
shuffleTaskManager.registerShuffle(
"clearTest2",
shuffleId,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY
);
shuffleTaskManager.refreshAppId("clearTest2");
shuffleTaskManager.checkResourceStatus();
assertEquals(Sets.newHashSet("clearTest1", "clearTest2"), shuffleTaskManager.getAppIds());
Thread.sleep(5000);
shuffleTaskManager.checkResourceStatus();
// wait resource delete
Thread.sleep(3000);
assertEquals(Sets.newHashSet(), shuffleTaskManager.getAppIds());
assertTrue(shuffleTaskManager.getCachedBlockIds("clearTest1", shuffleId).isEmpty());
}
@Test
public void clearMultiTimesTest() throws Exception {
ShuffleServerConf conf = new ShuffleServerConf();
String storageBasePath = HDFS_URI + "rss/clearMultiTimesTest";
final int shuffleId = 1;
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storageBasePath));
conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
String appId = "clearMultiTimesTest";
shuffleTaskManager.registerShuffle(
appId,
shuffleId,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY
);
shuffleTaskManager.refreshAppId(appId);
assertEquals(1, shuffleTaskManager.getAppIds().size());
shuffleTaskManager.checkResourceStatus();
assertEquals(Sets.newHashSet(appId), shuffleTaskManager.getAppIds());
CountDownLatch countDownLatch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
shuffleTaskManager.removeResources(appId);
} finally {
countDownLatch.countDown();
}
}).start();
}
countDownLatch.await();
assertEquals(Sets.newHashSet(), shuffleTaskManager.getAppIds());
assertTrue(shuffleTaskManager.getCachedBlockIds(appId, shuffleId).isEmpty());
}
@Test
public void removeResourcesByShuffleIdsMultiTimesTest() throws Exception {
ShuffleServerConf conf = new ShuffleServerConf();
String storageBasePath = HDFS_URI + "rss/removeResourcesByShuffleIdsMultiTimesTest";
final int shuffleId = 1;
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storageBasePath));
conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
String appId = "removeResourcesByShuffleIdsMultiTimesTest";
shuffleTaskManager.registerShuffle(
appId,
shuffleId,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY
);
shuffleTaskManager.refreshAppId(appId);
assertEquals(1, shuffleTaskManager.getAppIds().size());
shuffleTaskManager.checkResourceStatus();
assertEquals(Sets.newHashSet(appId), shuffleTaskManager.getAppIds());
assertEquals(1, (int) ShuffleServerMetrics.gaugeTotalPartitionNum.get());
CountDownLatch countDownLatch = new CountDownLatch(3);
List<Integer> shuffleIds = Lists.newArrayList(shuffleId);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
shuffleTaskManager.removeResourcesByShuffleIds(appId, shuffleIds);
} finally {
countDownLatch.countDown();
}
}).start();
}
countDownLatch.await();
assertEquals(0, (int) ShuffleServerMetrics.gaugeTotalPartitionNum.get());
}
@Test
public void getBlockIdsByPartitionIdTest() {
ShuffleServerConf conf = new ShuffleServerConf();
ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(
conf, null, null, null);
Roaring64NavigableMap expectedBlockIds = Roaring64NavigableMap.bitmapOf();
int expectedPartitionId = 5;
Roaring64NavigableMap bitmapBlockIds = Roaring64NavigableMap.bitmapOf();
for (int taskId = 1; taskId < 10; taskId++) {
for (int partitionId = 1; partitionId < 10; partitionId++) {
for (int i = 0; i < 2; i++) {
long blockId = getBlockId(partitionId, taskId, i);
bitmapBlockIds.addLong(blockId);
if (partitionId == expectedPartitionId) {
expectedBlockIds.addLong(blockId);
}
}
}
}
Roaring64NavigableMap resultBlockIds = shuffleTaskManager.getBlockIdsByPartitionId(
Sets.newHashSet(expectedPartitionId), bitmapBlockIds, Roaring64NavigableMap.bitmapOf());
assertEquals(expectedBlockIds, resultBlockIds);
bitmapBlockIds.addLong(getBlockId(0, 0, 0));
resultBlockIds = shuffleTaskManager.getBlockIdsByPartitionId(Sets.newHashSet(0), bitmapBlockIds,
Roaring64NavigableMap.bitmapOf());
assertEquals(Roaring64NavigableMap.bitmapOf(0L), resultBlockIds);
long expectedBlockId = getBlockId(
Constants.MAX_PARTITION_ID, Constants.MAX_TASK_ATTEMPT_ID, Constants.MAX_SEQUENCE_NO);
bitmapBlockIds.addLong(expectedBlockId);
resultBlockIds = shuffleTaskManager.getBlockIdsByPartitionId(Sets.newHashSet(Math.toIntExact(
Constants.MAX_PARTITION_ID)), bitmapBlockIds, Roaring64NavigableMap.bitmapOf());
assertEquals(Roaring64NavigableMap.bitmapOf(expectedBlockId), resultBlockIds);
}
@Test
public void getBlockIdsByMultiPartitionTest() {
ShuffleServerConf conf = new ShuffleServerConf();
ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(
conf, null, null, null);
Roaring64NavigableMap expectedBlockIds = Roaring64NavigableMap.bitmapOf();
int startPartition = 3;
int endPartition = 5;
Roaring64NavigableMap bitmapBlockIds = Roaring64NavigableMap.bitmapOf();
for (int taskId = 1; taskId < 10; taskId++) {
for (int partitionId = 1; partitionId < 10; partitionId++) {
for (int i = 0; i < 2; i++) {
long blockId = getBlockId(partitionId, taskId, i);
bitmapBlockIds.addLong(blockId);
if (partitionId >= startPartition && partitionId <= endPartition) {
expectedBlockIds.addLong(blockId);
}
}
}
}
Set<Integer> requestPartitions = Sets.newHashSet();
Set<Integer> allPartitions = Sets.newHashSet();
for (int partitionId = 1; partitionId < 10; partitionId++) {
allPartitions.add(partitionId);
if (partitionId >= startPartition && partitionId <= endPartition) {
requestPartitions.add(partitionId);
}
}
Roaring64NavigableMap resultBlockIds =
shuffleTaskManager.getBlockIdsByPartitionId(requestPartitions, bitmapBlockIds,
Roaring64NavigableMap.bitmapOf());
assertEquals(expectedBlockIds, resultBlockIds);
assertEquals(bitmapBlockIds, shuffleTaskManager.getBlockIdsByPartitionId(allPartitions, bitmapBlockIds,
Roaring64NavigableMap.bitmapOf()));
}
@Test
public void testGetFinishedBlockIds() throws Exception {
ShuffleServerConf conf = new ShuffleServerConf();
String storageBasePath = HDFS_URI + "rss/test";
String appId = "test_app";
final int shuffleId = 1;
final int bitNum = 3;
final int partitionNum = 10;
final int taskNum = 10;
final int blocksPerTask = 2;
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storageBasePath));
conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
shuffleServer = new ShuffleServer(conf);
ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager();
ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager();
StorageManager storageManager = shuffleServer.getStorageManager();
ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(conf, shuffleFlushManager,
shuffleBufferManager, storageManager);
int startPartition = 6;
int endPartition = 9;
Roaring64NavigableMap expectedBlockIds = Roaring64NavigableMap.bitmapOf();
Map<Integer, long[]> blockIdsToReport = Maps.newHashMap();
for (int partitionId = 0; partitionId < partitionNum; partitionId++) {
shuffleTaskManager.registerShuffle(
appId,
shuffleId,
Lists.newArrayList(new PartitionRange(partitionId, partitionId)),
new RemoteStorageInfo(storageBasePath),
StringUtils.EMPTY
);
long[] blockIds = new long[taskNum * blocksPerTask];
for (int taskId = 0; taskId < taskNum; taskId++) {
for (int i = 0; i < blocksPerTask; i++) {
long blockId = getBlockId(partitionId, taskId, i);
blockIds[taskId * blocksPerTask + i] = blockId;
}
}
blockIdsToReport.putIfAbsent(partitionId, blockIds);
if (partitionId >= startPartition) {
expectedBlockIds.add(blockIds);
}
}
assertEquals((endPartition - startPartition + 1) * taskNum * blocksPerTask,
expectedBlockIds.getLongCardinality());
shuffleTaskManager.addFinishedBlockIds(appId, shuffleId, blockIdsToReport, bitNum);
Set<Integer> requestPartitions = Sets.newHashSet();
for (int partitionId = startPartition; partitionId <= endPartition; partitionId++) {
requestPartitions.add(partitionId);
}
byte[] serializeBitMap =
shuffleTaskManager.getFinishedBlockIds(appId, shuffleId, requestPartitions);
Roaring64NavigableMap resBlockIds = RssUtils.deserializeBitMap(serializeBitMap);
assertEquals(expectedBlockIds, resBlockIds);
}
@Test
public void testAddFinishedBlockIdsWithoutRegister() throws Exception {
ShuffleServerConf conf = new ShuffleServerConf();
String storageBasePath = HDFS_URI + "rss/test";
String appId = "testAddFinishedBlockIdsToExpiredApp";
final int shuffleId = 1;
final int bitNum = 3;
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storageBasePath));
conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
shuffleServer = new ShuffleServer(conf);
ShuffleBufferManager shuffleBufferManager = shuffleServer.getShuffleBufferManager();
ShuffleFlushManager shuffleFlushManager = shuffleServer.getShuffleFlushManager();
StorageManager storageManager = shuffleServer.getStorageManager();
ShuffleTaskManager shuffleTaskManager = new ShuffleTaskManager(conf, shuffleFlushManager,
shuffleBufferManager, storageManager);
Map<Integer, long[]> blockIdsToReport = Maps.newHashMap();
try {
shuffleTaskManager.addFinishedBlockIds(appId, shuffleId, blockIdsToReport, bitNum);
fail("Exception should be thrown");
} catch (RuntimeException e) {
assertTrue(e.getMessage().equals("appId[" + appId + "] is expired!"));
}
}
@Test
public void checkAndClearLeakShuffleDataTest(@TempDir File tempDir) throws Exception {
final String appId = "clearLocalTest_appId";
ShuffleServerConf conf = new ShuffleServerConf();
final int shuffleId = 1;
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 64L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(tempDir.getAbsolutePath()));
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
// make sure not to check leak shuffle data automatically
conf.setLong(ShuffleServerConf.SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL, 600 * 1000L);
shuffleServer = new ShuffleServer(conf);
ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
shuffleTaskManager.registerShuffle(
appId,
shuffleId,
Lists.newArrayList(new PartitionRange(0, 1)),
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY
);
shuffleTaskManager.refreshAppId(appId);
assertEquals(1, shuffleTaskManager.getAppIds().size());
ShufflePartitionedData shuffleData = createPartitionedData(1, 1, 48);
// make sure shuffle data flush to disk
int retry = 0;
while (retry < 5) {
Thread.sleep(1000);
shuffleTaskManager.cacheShuffleData(appId, shuffleId, false, shuffleData);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, shuffleData.getBlockList());
shuffleTaskManager.refreshAppId(appId);
shuffleTaskManager.checkResourceStatus();
retry++;
}
StorageManager storageManager = shuffleServer.getStorageManager();
assertTrue(storageManager instanceof LocalStorageManager);
LocalStorageManager localStorageManager = (LocalStorageManager) storageManager;
// parse appIds from storage
Set<String> appIdsOnDisk = getAppIdsOnDisk(localStorageManager);
assertEquals(appIdsOnDisk.size(), shuffleTaskManager.getAppIds().size());
assertTrue(appIdsOnDisk.contains(appId));
// make sure heartbeat timeout and resources are removed
Awaitility.await().timeout(10, TimeUnit.SECONDS).until(
() -> shuffleTaskManager.getAppIds().size() == 0);
// Create the hidden dir to simulate LocalStorageChecker's check
String storageDir = tempDir.getAbsolutePath();
File hiddenFile = new File(storageDir + "/" + LocalStorageChecker.CHECKER_DIR_NAME);
hiddenFile.mkdir();
appIdsOnDisk = getAppIdsOnDisk(localStorageManager);
assertFalse(appIdsOnDisk.contains(appId));
assertFalse(appIdsOnDisk.contains(LocalStorageChecker.CHECKER_DIR_NAME));
// mock leak shuffle data
File file = new File(tempDir, appId);
assertFalse(file.exists());
file.mkdir();
assertTrue(file.exists());
// execute checkLeakShuffleData
shuffleTaskManager.checkLeakShuffleData();
assertFalse(file.exists());
assertTrue(hiddenFile.exists());
}
private Set<String> getAppIdsOnDisk(LocalStorageManager localStorageManager) {
Set<String> appIdsOnDisk = new HashSet<>();
List<LocalStorage> storages = localStorageManager.getStorages();
for (LocalStorage storage : storages) {
appIdsOnDisk.addAll(storage.getAppIds());
}
return appIdsOnDisk;
}
// copy from ClientUtils
private Long getBlockId(long partitionId, long taskAttemptId, long atomicInt) {
return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH + Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
+ (partitionId << Constants.TASK_ATTEMPT_ID_MAX_LENGTH) + taskAttemptId;
}
private void waitForFlush(ShuffleFlushManager shuffleFlushManager,
String appId, int shuffleId, int expectedBlockNum) throws Exception {
int retry = 0;
while (true) {
// remove flushed eventId to test timeout in commit
if (shuffleFlushManager.getCommittedBlockIds(appId, shuffleId).getIntCardinality() == expectedBlockNum) {
break;
}
Thread.sleep(1000);
retry++;
if (retry > 5) {
fail("Timeout to flush data");
}
}
}
private ShufflePartitionedData createPartitionedData(int partitionId, int blockNum, int dataLength) {
ShufflePartitionedBlock[] blocks = createBlock(blockNum, dataLength);
return new ShufflePartitionedData(partitionId, blocks);
}
private ShufflePartitionedBlock[] createBlock(int num, int length) {
ShufflePartitionedBlock[] blocks = new ShufflePartitionedBlock[num];
for (int i = 0; i < num; i++) {
byte[] buf = new byte[length];
new Random().nextBytes(buf);
blocks[i] = new ShufflePartitionedBlock(
length, length, ChecksumUtils.getCrc32(buf), ATOMIC_INT.incrementAndGet(), 0, buf);
}
return blocks;
}
private void validate(String appId, int shuffleId, int partitionId, List<ShufflePartitionedBlock> blocks,
String basePath) {
Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
Set<Long> remainIds = Sets.newHashSet();
for (ShufflePartitionedBlock spb : blocks) {
expectBlockIds.addLong(spb.getBlockId());
remainIds.add(spb.getBlockId());
}
HdfsClientReadHandler handler = new HdfsClientReadHandler(appId, shuffleId, partitionId,
100, 1, 10, 1000, expectBlockIds, processBlockIds, basePath, new Configuration());
ShuffleDataResult sdr = handler.readShuffleData();
List<BufferSegment> bufferSegments = sdr.getBufferSegments();
int matchNum = 0;
for (ShufflePartitionedBlock block : blocks) {
for (BufferSegment bs : bufferSegments) {
if (bs.getBlockId() == block.getBlockId()) {
assertEquals(block.getLength(), bs.getLength());
assertEquals(block.getCrc(), bs.getCrc());
matchNum++;
break;
}
}
}
assertEquals(blocks.size(), matchNum);
}
}