blob: 2b21445e32105cf234daa62cefa9da2e57f8f605 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.uniffle.server;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.prometheus.client.Gauge;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.server.event.AppPurgeEvent;
import org.apache.uniffle.server.event.ShufflePurgeEvent;
import org.apache.uniffle.server.storage.HadoopStorageManager;
import org.apache.uniffle.server.storage.HybridStorageManager;
import org.apache.uniffle.server.storage.LocalStorageManager;
import org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy;
import org.apache.uniffle.server.storage.StorageManager;
import org.apache.uniffle.server.storage.StorageManagerFactory;
import org.apache.uniffle.storage.HadoopTestBase;
import org.apache.uniffle.storage.common.AbstractStorage;
import org.apache.uniffle.storage.common.HadoopStorage;
import org.apache.uniffle.storage.common.LocalStorage;
import org.apache.uniffle.storage.handler.impl.HadoopClientReadHandler;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ShuffleFlushManagerTest extends HadoopTestBase {
private static AtomicInteger ATOMIC_INT = new AtomicInteger(0);
private static AtomicLong ATOMIC_LONG = new AtomicLong(0);
private ShuffleServerConf shuffleServerConf = new ShuffleServerConf();
private RemoteStorageInfo remoteStorage =
new RemoteStorageInfo(HDFS_URI + "rss/test", Maps.newHashMap());
private static ShuffleServer mockShuffleServer = mock(ShuffleServer.class);
@BeforeAll
public static void beforeAll() throws Exception {
ShuffleTaskManager shuffleTaskManager = mock(ShuffleTaskManager.class);
ShuffleBufferManager shuffleBufferManager = mock(ShuffleBufferManager.class);
when(mockShuffleServer.getShuffleTaskManager()).thenReturn(shuffleTaskManager);
when(mockShuffleServer.getShuffleBufferManager()).thenReturn(shuffleBufferManager);
}
@BeforeEach
public void prepare() {
ShuffleServerMetrics.register();
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Collections.emptyList());
shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.HDFS.name());
shuffleServerConf.setInteger(ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION, 1);
LogManager.getRootLogger().setLevel(Level.INFO);
}
@AfterEach
public void clear() {
ShuffleServerMetrics.clear();
}
@Test
public void hadoopConfTest() {
shuffleServerConf.setString("rss.server.hadoop.dfs.replication", "2");
shuffleServerConf.setString("rss.server.hadoop.a.b", "value");
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
ShuffleFlushManager manager =
new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, storageManager);
assertEquals("2", manager.getHadoopConf().get("dfs.replication"));
assertEquals("value", manager.getHadoopConf().get("a.b"));
}
/**
* When enable concurrent writing single partition data, it should always write one file if no
* race condition.
*/
@Test
public void concurrentWrite2HdfsWriteOneByOne() throws Exception {
ShuffleServerConf shuffleServerConf = new ShuffleServerConf();
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Collections.emptyList());
shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.HDFS.name());
int maxConcurrency = 3;
shuffleServerConf.setInteger(
ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION, maxConcurrency);
String appId = "concurrentWrite2HdfsWriteOneByOne_appId";
ReentrantReadWriteLock rsLock = new ReentrantReadWriteLock();
when(mockShuffleServer.getShuffleTaskManager().getAppReadLock(appId))
.thenReturn(rsLock.readLock());
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
storageManager.registerRemoteStorage(appId, remoteStorage);
ShuffleFlushManager manager =
new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, storageManager);
for (int i = 0; i < 10; i++) {
ShuffleDataFlushEvent shuffleDataFlushEvent =
createShuffleDataFlushEvent(appId, i, 1, 1, null);
manager.addToFlushQueue(shuffleDataFlushEvent);
waitForFlush(manager, appId, i, 5);
}
FileStatus[] fileStatuses = fs.listStatus(new Path(HDFS_URI + "/rss/test/" + appId + "/1/1-1"));
long actual =
Arrays.stream(fileStatuses).filter(x -> x.getPath().getName().endsWith("data")).count();
assertEquals(1, actual);
actual =
Arrays.stream(fileStatuses).filter(x -> x.getPath().getName().endsWith("index")).count();
assertEquals(1, actual);
}
@Test
public void concurrentWrite2HdfsWriteOfSinglePartition() throws Exception {
ShuffleServerConf shuffleServerConf = new ShuffleServerConf();
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Collections.emptyList());
shuffleServerConf.setString(shuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.HDFS.name());
int maxConcurrency = 3;
shuffleServerConf.setInteger(
ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION, maxConcurrency);
String appId = "concurrentWrite2HdfsWriteOfSinglePartition_appId";
ReentrantReadWriteLock rsLock = new ReentrantReadWriteLock();
when(mockShuffleServer.getShuffleTaskManager().getAppReadLock(appId))
.thenReturn(rsLock.readLock());
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
storageManager.registerRemoteStorage(appId, remoteStorage);
ShuffleFlushManager manager =
new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, storageManager);
IntStream.range(0, 20)
.forEach(
x -> {
ShuffleDataFlushEvent event = createShuffleDataFlushEvent(appId, 1, 1, 1, null);
manager.addToFlushQueue(event);
});
waitForFlush(manager, appId, 1, 20 * 5);
FileStatus[] fileStatuses = fs.listStatus(new Path(HDFS_URI + "/rss/test/" + appId + "/1/1-1"));
long actual =
Arrays.stream(fileStatuses).filter(x -> x.getPath().getName().endsWith("data")).count();
assertEquals(maxConcurrency, actual);
actual =
Arrays.stream(fileStatuses).filter(x -> x.getPath().getName().endsWith("index")).count();
assertEquals(maxConcurrency, actual);
}
@Test
public void writeTest() throws Exception {
String appId = "writeTest_appId";
ReentrantReadWriteLock rsLock = new ReentrantReadWriteLock();
when(mockShuffleServer.getShuffleTaskManager().getAppReadLock(appId))
.thenReturn(rsLock.readLock());
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
storageManager.registerRemoteStorage(appId, remoteStorage);
String storageHost = cluster.getURI().getHost();
assertEquals(
0.0,
ShuffleServerMetrics.counterRemoteStorageTotalWrite
.labels(Constants.SHUFFLE_SERVER_VERSION, storageHost)
.get(),
0.5);
assertEquals(
0.0,
ShuffleServerMetrics.counterRemoteStorageRetryWrite
.labels(Constants.SHUFFLE_SERVER_VERSION, storageHost)
.get(),
0.5);
assertEquals(
0.0,
ShuffleServerMetrics.counterRemoteStorageFailedWrite
.labels(Constants.SHUFFLE_SERVER_VERSION, storageHost)
.get(),
0.5);
assertEquals(
0.0,
ShuffleServerMetrics.counterRemoteStorageSuccessWrite
.labels(Constants.SHUFFLE_SERVER_VERSION, storageHost)
.get(),
0.5);
ShuffleFlushManager manager =
new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, storageManager);
ShuffleDataFlushEvent event1 = createShuffleDataFlushEvent(appId, 1, 1, 1, null);
final List<ShufflePartitionedBlock> blocks1 = event1.getShuffleBlocks();
manager.addToFlushQueue(event1);
ShuffleDataFlushEvent event21 = createShuffleDataFlushEvent(appId, 2, 2, 2, null);
final List<ShufflePartitionedBlock> blocks21 = event21.getShuffleBlocks();
manager.addToFlushQueue(event21);
ShuffleDataFlushEvent event22 = createShuffleDataFlushEvent(appId, 2, 2, 2, null);
final List<ShufflePartitionedBlock> blocks22 = event22.getShuffleBlocks();
manager.addToFlushQueue(event22);
// wait for write data
waitForFlush(manager, appId, 1, 5);
waitForFlush(manager, appId, 2, 10);
validate(appId, 1, 1, blocks1, 1, remoteStorage.getPath());
assertEquals(blocks1.size(), manager.getCommittedBlockIds(appId, 1).getLongCardinality());
blocks21.addAll(blocks22);
validate(appId, 2, 2, blocks21, 1, remoteStorage.getPath());
assertEquals(blocks21.size(), manager.getCommittedBlockIds(appId, 2).getLongCardinality());
assertEquals(
3.0,
ShuffleServerMetrics.counterRemoteStorageTotalWrite
.labels(Constants.SHUFFLE_SERVER_VERSION, storageHost)
.get(),
0.5);
assertEquals(
3.0,
ShuffleServerMetrics.counterRemoteStorageSuccessWrite
.labels(Constants.SHUFFLE_SERVER_VERSION, storageHost)
.get(),
0.5);
// test case for process event whose related app was cleared already
assertEquals(0, ShuffleServerMetrics.gaugeWriteHandler.get(), 0.5);
ShuffleDataFlushEvent fakeEvent = createShuffleDataFlushEvent("fakeAppId", 1, 1, 1, null);
when(mockShuffleServer.getShuffleTaskManager().getAppReadLock("fakeAppId"))
.thenReturn(rsLock.readLock());
manager.addToFlushQueue(fakeEvent);
waitForQueueClear(manager);
waitForMetrics(ShuffleServerMetrics.gaugeWriteHandler, 0, 0.5);
}
@Test
public void localMetricsTest(@TempDir File tempDir) throws Exception {
shuffleServerConf.set(
ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(tempDir.getAbsolutePath()));
shuffleServerConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE.name());
String appId = "localMetricsTest_appId";
ReentrantReadWriteLock rsLock = new ReentrantReadWriteLock();
when(mockShuffleServer.getShuffleTaskManager().getAppReadLock(appId))
.thenReturn(rsLock.readLock());
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
ShuffleFlushManager manager =
new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, storageManager);
ShuffleDataFlushEvent event1 = createShuffleDataFlushEvent(appId, 1, 1, 1, null);
manager.addToFlushQueue(event1);
// wait for write data
waitForFlush(manager, appId, 1, 5);
validateLocalMetadata(storageManager, 0, 160L);
ShuffleDataFlushEvent event12 = createShuffleDataFlushEvent(appId, 1, 1, 1, null);
manager.addToFlushQueue(event12);
// wait for write data
waitForFlush(manager, appId, 1, 10);
validateLocalMetadata(storageManager, 0, 320L);
}
@Test
public void totalLocalFileWriteDataMetricTest() throws Exception {
List<String> storagePaths = Arrays.asList("/tmp/rss-data1", "/tmp/rss-data2", "/tmp/rss-data3");
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, storagePaths);
shuffleServerConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L);
shuffleServerConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name());
String appId = "localMetricsTest_appId";
ReentrantReadWriteLock rsLock = new ReentrantReadWriteLock();
when(mockShuffleServer.getShuffleTaskManager().getAppReadLock(appId))
.thenReturn(rsLock.readLock());
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
ShuffleFlushManager manager =
new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, storageManager);
ShuffleDataFlushEvent flushEvent = createShuffleDataFlushEvent(appId, 1, 1, 1, 10, 100, null);
manager.addToFlushQueue(flushEvent);
// wait for write data
waitForFlush(manager, appId, 1, 10);
int storageIndex = storagePaths.indexOf(flushEvent.getUnderStorage().getStoragePath());
validateLocalMetadata(storageManager, storageIndex, 1000L);
flushEvent = createShuffleDataFlushEvent(appId, 2, 1, 1, 10, 101, null);
manager.addToFlushQueue(flushEvent);
// wait for write data
waitForFlush(manager, appId, 2, 10);
int storageIndex1 = storagePaths.indexOf(flushEvent.getUnderStorage().getStoragePath());
validateLocalMetadata(storageManager, storageIndex1, 1010L);
storageManager.getStorageChecker().checkIsHealthy();
flushEvent = createShuffleDataFlushEvent(appId, 3, 1, 1, 10, 102, null);
manager.addToFlushQueue(flushEvent);
// wait for write data
waitForFlush(manager, appId, 3, 10);
int storageIndex2 = storagePaths.indexOf(flushEvent.getUnderStorage().getStoragePath());
validateLocalMetadata(storageManager, storageIndex2, 1020L);
assertEquals(
1000L,
ShuffleServerMetrics.counterTotalLocalFileWriteDataSize
.labels(storagePaths.get(storageIndex))
.get());
assertEquals(
1010L,
ShuffleServerMetrics.counterTotalLocalFileWriteDataSize
.labels(storagePaths.get(storageIndex1))
.get());
assertEquals(
1020L,
ShuffleServerMetrics.counterTotalLocalFileWriteDataSize
.labels(storagePaths.get(storageIndex2))
.get());
}
@Test
public void complexWriteTest() throws Exception {
shuffleServerConf.setString("rss.server.flush.handler.expired", "3");
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
String appId = "complexWriteTest_appId";
ReentrantReadWriteLock rsLock = new ReentrantReadWriteLock();
when(mockShuffleServer.getShuffleTaskManager().getAppReadLock(appId))
.thenReturn(rsLock.readLock());
storageManager.registerRemoteStorage(appId, remoteStorage);
List<ShufflePartitionedBlock> expectedBlocks = Lists.newArrayList();
List<ShuffleDataFlushEvent> flushEvents1 = Lists.newArrayList();
List<ShuffleDataFlushEvent> flushEvents2 = Lists.newArrayList();
ShuffleFlushManager manager =
new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, storageManager);
for (int i = 0; i < 30; i++) {
ShuffleDataFlushEvent flushEvent1 = createShuffleDataFlushEvent(appId, 1, 1, 1, null);
ShuffleDataFlushEvent flushEvent2 = createShuffleDataFlushEvent(appId, 1, 1, 1, null);
expectedBlocks.addAll(flushEvent1.getShuffleBlocks());
expectedBlocks.addAll(flushEvent2.getShuffleBlocks());
flushEvents1.add(flushEvent1);
flushEvents2.add(flushEvent2);
}
Thread flushThread1 =
new Thread(
() -> {
for (ShuffleDataFlushEvent event : flushEvents1) {
manager.addToFlushQueue(event);
}
});
Thread flushThread2 =
new Thread(
() -> {
for (ShuffleDataFlushEvent event : flushEvents2) {
manager.addToFlushQueue(event);
}
});
flushThread1.start();
flushThread2.start();
flushThread1.join();
flushThread2.join();
waitForFlush(manager, appId, 1, 300);
validate(appId, 1, 1, expectedBlocks, 1, remoteStorage.getPath());
}
@Test
public void clearTest() throws Exception {
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
String appId1 = "complexWriteTest_appId1";
String appId2 = "complexWriteTest_appId2";
ReentrantReadWriteLock rsLock1 = new ReentrantReadWriteLock();
when(mockShuffleServer.getShuffleTaskManager().getAppReadLock(appId1))
.thenReturn(rsLock1.readLock());
ReentrantReadWriteLock rsLock2 = new ReentrantReadWriteLock();
when(mockShuffleServer.getShuffleTaskManager().getAppReadLock(appId2))
.thenReturn(rsLock2.readLock());
storageManager.registerRemoteStorage(appId1, remoteStorage);
storageManager.registerRemoteStorage(appId2, remoteStorage);
ShuffleFlushManager manager =
new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, storageManager);
ShuffleDataFlushEvent event1 = createShuffleDataFlushEvent(appId1, 1, 0, 1, null);
manager.addToFlushQueue(event1);
ShuffleDataFlushEvent event2 = createShuffleDataFlushEvent(appId2, 1, 0, 1, null);
manager.addToFlushQueue(event2);
waitForFlush(manager, appId1, 1, 5);
waitForFlush(manager, appId2, 1, 5);
final AbstractStorage storage = (AbstractStorage) storageManager.selectStorage(event1);
assertEquals(5, manager.getCommittedBlockIds(appId1, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
assertEquals(storageManager.selectStorage(event1), storageManager.selectStorage(event2));
int size = storage.getHandlerSize();
assertEquals(2, size);
FileStatus[] fileStatus = fs.listStatus(new Path(remoteStorage.getPath() + "/" + appId1 + "/"));
assertTrue(fileStatus.length > 0);
manager.removeResources(appId1);
assertTrue(((HadoopStorageManager) storageManager).getAppIdToStorages().containsKey(appId1));
storageManager.removeResources(new AppPurgeEvent(appId1, StringUtils.EMPTY, Arrays.asList(1)));
assertFalse(((HadoopStorageManager) storageManager).getAppIdToStorages().containsKey(appId1));
try {
fs.listStatus(new Path(remoteStorage.getPath() + "/" + appId1 + "/"));
fail("Exception should be thrown");
} catch (FileNotFoundException fnfe) {
// expected exception
}
assertEquals(0, manager.getCommittedBlockIds(appId1, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
size = storage.getHandlerSize();
assertEquals(1, size);
manager.removeResources(appId2);
assertTrue(((HadoopStorageManager) storageManager).getAppIdToStorages().containsKey(appId2));
storageManager.removeResources(new AppPurgeEvent(appId2, StringUtils.EMPTY, Arrays.asList(1)));
assertFalse(((HadoopStorageManager) storageManager).getAppIdToStorages().containsKey(appId2));
assertEquals(0, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
size = storage.getHandlerSize();
assertEquals(0, size);
// fs create a remoteStorage for appId2 before remove resources,
// but the cache from appIdToStorages has been removed, so we need to delete this path in hdfs
Path path = new Path(remoteStorage.getPath() + "/" + appId2 + "/");
assertTrue(fs.mkdirs(path));
storageManager.removeResources(
new AppPurgeEvent(appId2, StringUtils.EMPTY, Lists.newArrayList(1)));
assertFalse(fs.exists(path));
HadoopStorage storageByAppId =
((HadoopStorageManager) storageManager).getStorageByAppId(appId2);
assertNull(storageByAppId);
}
@Test
public void clearLocalTest(@TempDir File tempDir) throws Exception {
final String appId1 = "clearLocalTest_appId1";
final String appId2 = "clearLocalTest_appId12";
ReentrantReadWriteLock rsLock1 = new ReentrantReadWriteLock();
when(mockShuffleServer.getShuffleTaskManager().getAppReadLock(appId1))
.thenReturn(rsLock1.readLock());
ReentrantReadWriteLock rsLock2 = new ReentrantReadWriteLock();
when(mockShuffleServer.getShuffleTaskManager().getAppReadLock(appId2))
.thenReturn(rsLock2.readLock());
ShuffleServerConf serverConf = new ShuffleServerConf();
serverConf.set(
ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(tempDir.getAbsolutePath()));
serverConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name());
serverConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager(serverConf);
ShuffleFlushManager manager =
new ShuffleFlushManager(serverConf, mockShuffleServer, storageManager);
ShuffleDataFlushEvent event1 = createShuffleDataFlushEvent(appId1, 1, 0, 1, null);
manager.addToFlushQueue(event1);
ShuffleDataFlushEvent event2 = createShuffleDataFlushEvent(appId2, 1, 0, 1, null);
manager.addToFlushQueue(event2);
ShuffleDataFlushEvent event3 = createShuffleDataFlushEvent(appId2, 2, 0, 1, null);
manager.addToFlushQueue(event3);
ShuffleDataFlushEvent event5 = createShuffleDataFlushEvent(appId2, 11, 0, 1, null);
manager.addToFlushQueue(event5);
assertEquals(storageManager.selectStorage(event1), storageManager.selectStorage(event2));
final AbstractStorage storage = (AbstractStorage) storageManager.selectStorage(event1);
waitForFlush(manager, appId1, 1, 5);
waitForFlush(manager, appId2, 1, 5);
waitForFlush(manager, appId2, 2, 5);
waitForFlush(manager, appId2, 11, 5);
assertEquals(5, manager.getCommittedBlockIds(appId1, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 2).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 11).getLongCardinality());
assertEquals(2, storage.getHandlerSize());
File file = new File(tempDir, appId1);
assertTrue(file.exists());
storageManager.removeResources(
new ShufflePurgeEvent(appId1, StringUtils.EMPTY, Lists.newArrayList(1)));
ShuffleDataFlushEvent event4 = createShuffleDataFlushEvent(appId1, 1, 0, 1, () -> false);
manager.addToFlushQueue(event4);
Thread.sleep(1000);
storageManager.removeResources(
new AppPurgeEvent(appId1, StringUtils.EMPTY, Lists.newArrayList(1)));
manager.removeResources(appId1);
assertFalse(file.exists());
ShuffleDataReadEvent shuffleReadEvent = new ShuffleDataReadEvent(appId2, 1, 0, 0);
assertNotNull(storageManager.selectStorage(shuffleReadEvent));
assertEquals(0, manager.getCommittedBlockIds(appId1, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 2).getLongCardinality());
assertEquals(1, storage.getHandlerSize());
manager.removeResources(appId2);
storageManager.removeResources(
new ShufflePurgeEvent(appId2, StringUtils.EMPTY, Lists.newArrayList(1)));
ShuffleDataReadEvent shuffle1ReadEvent = new ShuffleDataReadEvent(appId2, 1, 0, 0);
ShuffleDataReadEvent shuffle11ReadEvent = new ShuffleDataReadEvent(appId2, 11, 0, 0);
assertNull(storageManager.selectStorage(shuffle1ReadEvent));
assertNotNull(storageManager.selectStorage(shuffle11ReadEvent));
storageManager.removeResources(
new ShufflePurgeEvent(appId2, StringUtils.EMPTY, Lists.newArrayList(2)));
storageManager.removeResources(
new AppPurgeEvent(appId2, StringUtils.EMPTY, Lists.newArrayList(1)));
assertEquals(0, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
assertEquals(0, storage.getHandlerSize());
assertEquals(
0, ((LocalStorageManager) storageManager).getSortedPartitionsOfStorageMap().size());
}
private void waitForMetrics(Gauge.Child gauge, double expected, double delta) throws Exception {
int retry = 0;
boolean match = false;
do {
Thread.sleep(500);
if (retry > 10) {
fail("Unexpected flush process when waitForMetrics");
}
retry++;
try {
assertEquals(0, gauge.get(), delta);
match = true;
} catch (Exception e) {
// ignore
}
} while (!match);
}
private void waitForQueueClear(ShuffleFlushManager manager) throws Exception {
int retry = 0;
int size = 0;
do {
Thread.sleep(500);
if (retry > 10) {
fail("Unexpected flush process when waitForQueueClear");
}
retry++;
size = manager.getEventNumInFlush();
} while (size > 0);
}
public static void waitForFlush(
ShuffleFlushManager manager, String appId, int shuffleId, int expectedBlockNum)
throws Exception {
int retry = 0;
int size = 0;
do {
Thread.sleep(500);
if (retry > 100) {
fail("Unexpected flush process");
}
retry++;
size = manager.getCommittedBlockIds(appId, shuffleId).getIntCardinality();
} while (size < expectedBlockNum);
}
public static ShuffleDataFlushEvent createShuffleDataFlushEvent(
String appId,
int shuffleId,
int startPartition,
int endPartition,
Supplier<Boolean> isValid) {
return createShuffleDataFlushEvent(appId, shuffleId, startPartition, endPartition, isValid, 1);
}
public static ShuffleDataFlushEvent createShuffleDataFlushEvent(
String appId,
int shuffleId,
int startPartition,
int endPartition,
Supplier<Boolean> isValid,
int size) {
List<ShufflePartitionedBlock> spbs = createBlock(5, 32);
return new ShuffleDataFlushEvent(
ATOMIC_LONG.getAndIncrement(),
appId,
shuffleId,
startPartition,
endPartition,
size,
spbs,
isValid,
null);
}
public static ShuffleDataFlushEvent createShuffleDataFlushEvent(
String appId,
int shuffleId,
int startPartition,
int endPartition,
int blockNum,
int blockSize,
Supplier<Boolean> isValid) {
return new ShuffleDataFlushEvent(
ATOMIC_LONG.getAndIncrement(),
appId,
shuffleId,
startPartition,
endPartition,
(long) blockNum * blockSize,
createBlock(blockNum, blockSize),
isValid,
null);
}
public static List<ShufflePartitionedBlock> createBlock(int num, int length) {
List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
for (int i = 0; i < num; i++) {
byte[] buf = new byte[length];
new Random().nextBytes(buf);
blocks.add(
new ShufflePartitionedBlock(
length, length, ChecksumUtils.getCrc32(buf), ATOMIC_INT.incrementAndGet(), 0, buf));
}
return blocks;
}
private void validate(
String appId,
int shuffleId,
int partitionId,
List<ShufflePartitionedBlock> blocks,
int partitionNumPerRange,
String basePath) {
Roaring64NavigableMap expectBlockIds = Roaring64NavigableMap.bitmapOf();
Roaring64NavigableMap processBlockIds = Roaring64NavigableMap.bitmapOf();
Set<Long> remainIds = Sets.newHashSet();
for (ShufflePartitionedBlock spb : blocks) {
expectBlockIds.addLong(spb.getBlockId());
remainIds.add(spb.getBlockId());
}
HadoopClientReadHandler handler =
new HadoopClientReadHandler(
appId,
shuffleId,
partitionId,
100,
partitionNumPerRange,
10,
blocks.size() * 32,
expectBlockIds,
processBlockIds,
basePath,
new Configuration());
ShuffleDataResult sdr = null;
int matchNum = 0;
sdr = handler.readShuffleData();
List<BufferSegment> bufferSegments = sdr.getBufferSegments();
for (ShufflePartitionedBlock block : blocks) {
for (BufferSegment bs : bufferSegments) {
if (bs.getBlockId() == block.getBlockId()) {
matchNum++;
break;
}
}
}
for (BufferSegment bs : bufferSegments) {
remainIds.remove(bs.getBlockId());
}
assertEquals(blocks.size(), matchNum);
}
@Test
public void fallbackWrittenWhenHybridStorageManagerEnableTest(@TempDir File tempDir)
throws InterruptedException {
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 10000L);
shuffleServerConf.setString(
RssBaseConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE_HDFS.toString());
shuffleServerConf.set(
RssBaseConf.RSS_STORAGE_BASE_PATH, Arrays.asList(tempDir.getAbsolutePath()));
shuffleServerConf.set(ShuffleServerConf.DISK_CAPACITY, 100L);
shuffleServerConf.setString(
ShuffleServerConf.HYBRID_STORAGE_FALLBACK_STRATEGY_CLASS,
LocalStorageManagerFallbackStrategy.class.getCanonicalName());
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
String appId = "fallbackWrittenWhenMultiStorageManagerEnableTest";
ReentrantReadWriteLock rsLock = new ReentrantReadWriteLock();
when(mockShuffleServer.getShuffleTaskManager().getAppReadLock(appId))
.thenReturn(rsLock.readLock());
storageManager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage.getPath()));
ShuffleFlushManager flushManager =
new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, storageManager);
// case1: normally written to local storage
ShuffleDataFlushEvent event = createShuffleDataFlushEvent(appId, 1, 1, 1, null, 100);
flushManager.addToFlushQueue(event);
Thread.sleep(1000);
assertTrue(event.getUnderStorage() instanceof LocalStorage);
storageManager.getStorageChecker().checkIsHealthy();
// case2: huge event is written to cold storage directly
event = createShuffleDataFlushEvent(appId, 1, 1, 1, null, 100000);
flushManager.addToFlushQueue(event);
Thread.sleep(1000);
assertTrue(event.getUnderStorage() instanceof HadoopStorage);
assertEquals(0, event.getRetryTimes());
// case3: local disk is full or corrupted, fallback to HDFS
List<ShufflePartitionedBlock> blocks =
Lists.newArrayList(new ShufflePartitionedBlock(100000, 1000, 1, 1, 1L, (byte[]) null));
ShuffleDataFlushEvent bigEvent =
new ShuffleDataFlushEvent(1, "1", 2, 1, 1, 100, blocks, null, null);
bigEvent.setUnderStorage(
((HybridStorageManager) storageManager).getWarmStorageManager().selectStorage(event));
((HybridStorageManager) storageManager).getWarmStorageManager().updateWriteMetrics(bigEvent, 0);
storageManager.getStorageChecker().checkIsHealthy();
event = createShuffleDataFlushEvent(appId, 3, 1, 1, null, 100);
flushManager.addToFlushQueue(event);
Thread.sleep(1000);
assertTrue(event.getUnderStorage() instanceof HadoopStorage);
}
@Test
public void defaultFlushEventHandlerTest(@TempDir File tempDir) throws Exception {
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 10000L);
shuffleServerConf.setString(
RssBaseConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE_HDFS.toString());
shuffleServerConf.set(
RssBaseConf.RSS_STORAGE_BASE_PATH, Arrays.asList(tempDir.getAbsolutePath()));
shuffleServerConf.set(ShuffleServerConf.DISK_CAPACITY, 100L);
shuffleServerConf.set(ShuffleServerConf.SERVER_FLUSH_HADOOP_THREAD_POOL_SIZE, 1);
shuffleServerConf.set(ShuffleServerConf.SERVER_FLUSH_LOCALFILE_THREAD_POOL_SIZE, 1);
shuffleServerConf.setString(
ShuffleServerConf.HYBRID_STORAGE_FALLBACK_STRATEGY_CLASS,
LocalStorageManagerFallbackStrategy.class.getCanonicalName());
StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
String appId = "fallbackWrittenWhenMultiStorageManagerEnableTest";
ReentrantReadWriteLock rsLock = new ReentrantReadWriteLock();
when(mockShuffleServer.getShuffleTaskManager().getAppReadLock(appId))
.thenReturn(rsLock.readLock());
storageManager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage.getPath()));
ShuffleFlushManager flushManager =
new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, storageManager);
ShuffleServerMetrics.counterLocalFileEventFlush.clear();
ShuffleServerMetrics.counterHadoopEventFlush.clear();
// case1: normally written to local storage
ShuffleDataFlushEvent event = createShuffleDataFlushEvent(appId, 1, 1, 1, null, 100);
flushManager.addToFlushQueue(event);
waitForFlush(flushManager, appId, 1, 5);
assertEquals(0, event.getRetryTimes());
assertEquals(1, ShuffleServerMetrics.counterLocalFileEventFlush.get());
storageManager.getStorageChecker().checkIsHealthy();
// case2: huge event is written to cold storage directly
event = createShuffleDataFlushEvent(appId, 1, 1, 1, null, 100000);
flushManager.addToFlushQueue(event);
waitForFlush(flushManager, appId, 1, 10);
assertEquals(0, event.getRetryTimes());
assertEquals(1, ShuffleServerMetrics.counterHadoopEventFlush.get());
// case3: local disk is full or corrupted, fallback to HDFS
List<ShufflePartitionedBlock> blocks =
Lists.newArrayList(new ShufflePartitionedBlock(100000, 1000, 1, 1, 1L, (byte[]) null));
ShuffleDataFlushEvent bigEvent =
new ShuffleDataFlushEvent(1, "1", 1, 1, 1, 100, blocks, null, null);
bigEvent.setUnderStorage(
((HybridStorageManager) storageManager).getWarmStorageManager().selectStorage(event));
((HybridStorageManager) storageManager).getWarmStorageManager().updateWriteMetrics(bigEvent, 0);
storageManager.getStorageChecker().checkIsHealthy();
event = createShuffleDataFlushEvent(appId, 2, 1, 1, null, 100);
flushManager.addToFlushQueue(event);
waitForFlush(flushManager, appId, 2, 5);
assertEquals(0, event.getRetryTimes());
assertEquals(1, ShuffleServerMetrics.counterLocalFileEventFlush.get());
assertEquals(2, ShuffleServerMetrics.counterHadoopEventFlush.get());
}
private void validateLocalMetadata(StorageManager storageManager, int storageIndex, Long size) {
assertInstanceOf(LocalStorageManager.class, storageManager);
LocalStorage localStorage =
((LocalStorageManager) storageManager).getStorages().get(storageIndex);
assertEquals(size, localStorage.getMetaData().getDiskSize().longValue());
}
}