blob: 663a7bfc51f7144c0f9a7f1b3b76644ff02d8183 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.uniffle.server.storage;
import java.io.File;
import java.util.Arrays;
import java.util.Map;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.ShuffleServerMetrics;
import org.apache.uniffle.server.event.AppPurgeEvent;
import org.apache.uniffle.server.event.ShufflePurgeEvent;
import org.apache.uniffle.storage.common.HadoopStorage;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class HadoopStorageManagerTest {
@BeforeAll
public static void prepare() {
ShuffleServerMetrics.register();
}
@AfterAll
public static void clear() {
ShuffleServerMetrics.clear();
}
@Test
public void testRemoveResources() {
ShuffleServerConf conf = new ShuffleServerConf();
conf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name());
HadoopStorageManager hadoopStorageManager = new HadoopStorageManager(conf);
final String remoteStoragePath1 = "hdfs://path1";
String appId = "testRemoveResources_appId";
hadoopStorageManager.registerRemoteStorage(
appId, new RemoteStorageInfo(remoteStoragePath1, ImmutableMap.of("k1", "v1", "k2", "v2")));
Map<String, HadoopStorage> appStorageMap = hadoopStorageManager.getAppIdToStorages();
// case1
assertEquals(1, appStorageMap.size());
ShufflePurgeEvent shufflePurgeEvent = new ShufflePurgeEvent(appId, "", Arrays.asList(1));
hadoopStorageManager.removeResources(shufflePurgeEvent);
assertEquals(1, appStorageMap.size());
// case2
AppPurgeEvent appPurgeEvent = new AppPurgeEvent(appId, "");
hadoopStorageManager.removeResources(appPurgeEvent);
assertEquals(0, appStorageMap.size());
}
@Test
public void testRegisterRemoteStorage() {
ShuffleServerConf conf = new ShuffleServerConf();
conf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 2000L);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("test"));
conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L);
conf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name());
HadoopStorageManager hadoopStorageManager = new HadoopStorageManager(conf);
final String remoteStoragePath1 = "hdfs://path1";
final String remoteStoragePath2 = "hdfs://path2";
final String remoteStoragePath3 = "hdfs://path3";
hadoopStorageManager.registerRemoteStorage(
"app1", new RemoteStorageInfo(remoteStoragePath1, ImmutableMap.of("k1", "v1", "k2", "v2")));
hadoopStorageManager.registerRemoteStorage(
"app2", new RemoteStorageInfo(remoteStoragePath2, ImmutableMap.of("k3", "v3")));
hadoopStorageManager.registerRemoteStorage(
"app3", new RemoteStorageInfo(remoteStoragePath3, Maps.newHashMap()));
Map<String, HadoopStorage> appStorageMap = hadoopStorageManager.getAppIdToStorages();
assertEquals(3, appStorageMap.size());
assertEquals(Sets.newHashSet("app1", "app2", "app3"), appStorageMap.keySet());
HadoopStorage hs1 = hadoopStorageManager.getAppIdToStorages().get("app1");
assertSame(hadoopStorageManager.getPathToStorages().get(remoteStoragePath1), hs1);
assertEquals("v1", hs1.getConf().get("k1"));
assertEquals("v2", hs1.getConf().get("k2"));
assertNull(hs1.getConf().get("k3"));
HadoopStorage hs2 = hadoopStorageManager.getAppIdToStorages().get("app2");
assertSame(hadoopStorageManager.getPathToStorages().get(remoteStoragePath2), hs2);
assertEquals("v3", hs2.getConf().get("k3"));
assertNull(hs2.getConf().get("k1"));
assertNull(hs2.getConf().get("k2"));
HadoopStorage hs3 = hadoopStorageManager.getAppIdToStorages().get("app3");
assertSame(hadoopStorageManager.getPathToStorages().get(remoteStoragePath3), hs3);
assertNull(hs3.getConf().get("k1"));
assertNull(hs3.getConf().get("k2"));
assertNull(hs3.getConf().get("k3"));
}
@Test
public void testRemoveExpiredResourcesWithTwoReplicas(@TempDir File remoteBasePath)
throws Exception {
ShuffleServerConf conf = new ShuffleServerConf();
conf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name());
String shuffleServerId = "127.0.0.1:19999";
conf.setString(ShuffleServerConf.SHUFFLE_SERVER_ID, shuffleServerId);
HadoopStorageManager hadoopStorageManager = new HadoopStorageManager(conf);
final String remoteStoragePath1 = new File(remoteBasePath, "path1").getAbsolutePath();
String appId = "testRemoveExpiredResources";
hadoopStorageManager.registerRemoteStorage(
appId, new RemoteStorageInfo(remoteStoragePath1, ImmutableMap.of("k1", "v1", "k2", "v2")));
Map<String, HadoopStorage> appStorageMap = hadoopStorageManager.getAppIdToStorages();
HadoopStorage storage = appStorageMap.get(appId);
String appPath = ShuffleStorageUtils.getFullShuffleDataFolder(storage.getStoragePath(), appId);
File appPathFile = new File(appPath);
File partitionDir = new File(appPathFile, "1/1-1/");
partitionDir.mkdirs();
// Simulate the case that there are two shuffle servers write data.
File dataFile = new File(partitionDir, shuffleServerId + "_1.data");
dataFile.createNewFile();
File dataFile2 = new File(partitionDir, "shuffleserver2_1.data");
dataFile2.createNewFile();
assertTrue(partitionDir.exists());
// Purged for expired
assertEquals(1, appStorageMap.size());
AppPurgeEvent shufflePurgeEvent = new AppPurgeEvent(appId, "", null, true);
hadoopStorageManager.removeResources(shufflePurgeEvent);
assertEquals(0, appStorageMap.size());
// The directory of the partition should have not been deleted, for it was not empty.
assertTrue(partitionDir.exists());
assertFalse(dataFile.exists());
assertTrue(dataFile2.exists());
// Purged for unregister
AppPurgeEvent appPurgeEvent = new AppPurgeEvent(appId, "");
hadoopStorageManager.removeResources(appPurgeEvent);
assertEquals(0, appStorageMap.size());
assertFalse(appPathFile.exists());
}
@Test
public void testRemoveExpiredResourcesWithOneReplica(@TempDir File remoteBasePath)
throws Exception {
ShuffleServerConf conf = new ShuffleServerConf();
conf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE_HDFS.name());
String shuffleServerId = "127.0.0.1:19999";
conf.setString(ShuffleServerConf.SHUFFLE_SERVER_ID, shuffleServerId);
HadoopStorageManager hadoopStorageManager = new HadoopStorageManager(conf);
final String remoteStoragePath1 = new File(remoteBasePath, "path1").getAbsolutePath();
String appId = "testRemoveExpiredResources2";
hadoopStorageManager.registerRemoteStorage(
appId, new RemoteStorageInfo(remoteStoragePath1, ImmutableMap.of("k1", "v1", "k2", "v2")));
Map<String, HadoopStorage> appStorageMap = hadoopStorageManager.getAppIdToStorages();
HadoopStorage storage = appStorageMap.get(appId);
String appPath = ShuffleStorageUtils.getFullShuffleDataFolder(storage.getStoragePath(), appId);
File appPathFile = new File(appPath);
File partitionDir = new File(appPathFile, "1/1-1/");
partitionDir.mkdirs();
// Simulate the case that only one shuffle server writes data.
File dataFile = new File(partitionDir, shuffleServerId + "_1.data");
dataFile.createNewFile();
assertTrue(partitionDir.exists());
// purged for expired
assertEquals(1, appStorageMap.size());
AppPurgeEvent shufflePurgeEvent = new AppPurgeEvent(appId, "", null, true);
hadoopStorageManager.removeResources(shufflePurgeEvent);
assertEquals(0, appStorageMap.size());
// The directory of the application should have been deleted, for it was empty.
assertFalse(partitionDir.exists());
// purged for unregister
AppPurgeEvent appPurgeEvent = new AppPurgeEvent(appId, "");
hadoopStorageManager.removeResources(appPurgeEvent);
assertEquals(0, appStorageMap.size());
assertFalse(appPathFile.exists());
}
}