blob: 7fe6f3bc0e872ce2cd8823eab6cfc9d101e07c76 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.uniffle.server.storage;
import java.util.Arrays;
import java.util.List;
import com.google.common.collect.Lists;
import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.common.HadoopStorage;
import org.apache.uniffle.storage.common.LocalStorage;
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class HybridStorageManagerTest {
/**
* this tests the fallback strategy when encountering the local storage is invalid. 1. When
* specifying the fallback max fail time = 0, the event will be discarded 2. When specifying the
* fallback max fail time < 0, the event will be taken by Hadoop Storage.
*/
@Test
public void fallbackTestWhenLocalStorageCorrupted() {
ShuffleServerConf conf = new ShuffleServerConf();
conf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 2000L);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("test"));
conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
conf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name());
conf.setString(
ShuffleServerConf.HYBRID_STORAGE_MANAGER_SELECTOR_CLASS,
"org.apache.uniffle.server.storage.hybrid.HugePartitionSensitiveStorageManagerSelector");
conf.setString(
ShuffleServerConf.HYBRID_STORAGE_FALLBACK_STRATEGY_CLASS,
"org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy");
// case1: fallback to hadoop storage when fallback_max_fail_time = -1
conf.setLong(ShuffleServerConf.FALLBACK_MAX_FAIL_TIMES, -1);
HybridStorageManager manager = new HybridStorageManager(conf);
LocalStorageManager localStorageManager = (LocalStorageManager) manager.getWarmStorageManager();
localStorageManager.getStorages().get(0).markCorrupted();
String remoteStorage = "test";
String appId = "selectStorageManagerWithSelectorAndFallbackStrategy_appId";
manager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage));
List<ShufflePartitionedBlock> blocks =
Lists.newArrayList(new ShufflePartitionedBlock(100, 1000, 1, 1, 1L, (byte[]) null));
ShuffleDataFlushEvent event =
new ShuffleDataFlushEvent(1, appId, 1, 1, 1, 1000, blocks, null, null);
assertTrue((manager.selectStorage(event) instanceof HadoopStorage));
// case2: fallback is still valid when fallback_max_fail_time = 0
conf.setLong(ShuffleServerConf.FALLBACK_MAX_FAIL_TIMES, 0);
manager = new HybridStorageManager(conf);
localStorageManager = (LocalStorageManager) manager.getWarmStorageManager();
localStorageManager.getStorages().get(0).markCorrupted();
event = new ShuffleDataFlushEvent(1, appId, 1, 1, 1, 1000, blocks, null, null);
manager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage));
assertTrue((manager.selectStorage(event) instanceof HadoopStorage));
}
@Test
public void selectStorageManagerTest() {
ShuffleServerConf conf = new ShuffleServerConf();
conf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 2000L);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("test"));
conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
conf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name());
HybridStorageManager manager = new HybridStorageManager(conf);
String remoteStorage = "test";
String appId = "selectStorageManagerTest_appId";
manager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage));
List<ShufflePartitionedBlock> blocks =
Lists.newArrayList(new ShufflePartitionedBlock(100, 1000, 1, 1, 1L, (byte[]) null));
ShuffleDataFlushEvent event =
new ShuffleDataFlushEvent(1, appId, 1, 1, 1, 1000, blocks, null, null);
assertTrue((manager.selectStorage(event) instanceof LocalStorage));
event = new ShuffleDataFlushEvent(1, appId, 1, 1, 1, 1000000, blocks, null, null);
assertTrue((manager.selectStorage(event) instanceof HadoopStorage));
}
@Test
public void testStorageManagerSelectorOfPreferCold() {
ShuffleServerConf conf = new ShuffleServerConf();
conf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 10000L);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("test"));
conf.setLong(ShuffleServerConf.DISK_CAPACITY, 10000L);
conf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name());
conf.setString(
ShuffleServerConf.HYBRID_STORAGE_FALLBACK_STRATEGY_CLASS,
RotateStorageManagerFallbackStrategy.class.getCanonicalName());
conf.set(
ShuffleServerConf.HYBRID_STORAGE_MANAGER_SELECTOR_CLASS,
"org.apache.uniffle.server.storage.hybrid.HugePartitionSensitiveStorageManagerSelector");
HybridStorageManager manager = new HybridStorageManager(conf);
String remoteStorage = "test";
String appId = "selectStorageManagerIfCanNotWriteTest_appId";
manager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage));
/**
* case1: only event owned by huge partition will be flushed to cold storage when the {@link
* org.apache.uniffle.server.storage.hybrid.StorageManagerSelector.ColdStoragePreferredFactor.HUGE_PARTITION}
* is enabled.
*/
List<ShufflePartitionedBlock> blocks =
Lists.newArrayList(new ShufflePartitionedBlock(10001, 1000, 1, 1, 1L, (byte[]) null));
ShuffleDataFlushEvent event =
new ShuffleDataFlushEvent(1, appId, 1, 1, 1, 100000, blocks, null, null);
Storage storage = manager.selectStorage(event);
assertTrue(storage instanceof LocalStorage);
ShuffleDataFlushEvent event1 =
new ShuffleDataFlushEvent(1, appId, 1, 1, 1, 10, blocks, null, null);
event1.markOwnedByHugePartition();
storage = manager.selectStorage(event1);
assertTrue(storage instanceof HadoopStorage);
}
@Test
public void underStorageManagerSelectionTest() {
ShuffleServerConf conf = new ShuffleServerConf();
conf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 10000L);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("test"));
conf.setLong(ShuffleServerConf.DISK_CAPACITY, 10000L);
conf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name());
conf.setString(
ShuffleServerConf.HYBRID_STORAGE_FALLBACK_STRATEGY_CLASS,
RotateStorageManagerFallbackStrategy.class.getCanonicalName());
HybridStorageManager manager = new HybridStorageManager(conf);
String remoteStorage = "test";
String appId = "selectStorageManagerIfCanNotWriteTest_appId";
manager.registerRemoteStorage(appId, new RemoteStorageInfo(remoteStorage));
/** case1: big event should be written into cold storage directly */
List<ShufflePartitionedBlock> blocks =
Lists.newArrayList(new ShufflePartitionedBlock(10001, 1000, 1, 1, 1L, (byte[]) null));
ShuffleDataFlushEvent hugeEvent =
new ShuffleDataFlushEvent(1, appId, 1, 1, 1, 10001, blocks, null, null);
assertTrue(manager.selectStorage(hugeEvent) instanceof HadoopStorage);
/** case2: fallback when disk can not write */
blocks = Lists.newArrayList(new ShufflePartitionedBlock(100, 1000, 1, 1, 1L, (byte[]) null));
ShuffleDataFlushEvent event =
new ShuffleDataFlushEvent(1, appId, 1, 1, 1, 1000, blocks, null, null);
Storage storage = manager.selectStorage(event);
assertTrue((storage instanceof LocalStorage));
((LocalStorage) storage).markCorrupted();
event = new ShuffleDataFlushEvent(1, appId, 1, 1, 1, 1000, blocks, null, null);
assertTrue((manager.selectStorage(event) instanceof HadoopStorage));
}
}