blob: c1169f42ff333f7aa1382e354f43a3cbcd253a73 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.uniffle.server.storage;
import java.lang.reflect.Constructor;
import java.util.Collection;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.storage.StorageInfo;
import org.apache.uniffle.server.Checker;
import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleDataReadEvent;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.event.PurgeEvent;
import org.apache.uniffle.server.storage.hybrid.StorageManagerSelector;
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
public class HybridStorageManager implements StorageManager {
private static final Logger LOG = LoggerFactory.getLogger(HybridStorageManager.class);
private final StorageManager warmStorageManager;
private final StorageManager coldStorageManager;
private final StorageManagerSelector storageManagerSelector;
HybridStorageManager(ShuffleServerConf conf) {
warmStorageManager = new LocalStorageManager(conf);
coldStorageManager = new HadoopStorageManager(conf);
try {
AbstractStorageManagerFallbackStrategy storageManagerFallbackStrategy =
loadFallbackStrategy(conf);
this.storageManagerSelector =
loadManagerSelector(
conf, storageManagerFallbackStrategy, warmStorageManager, coldStorageManager);
} catch (Exception e) {
throw new RssException("Errors on loading selector manager.", e);
}
}
private StorageManagerSelector loadManagerSelector(
ShuffleServerConf conf,
AbstractStorageManagerFallbackStrategy storageManagerFallbackStrategy,
StorageManager warmStorageManager,
StorageManager coldStorageManager)
throws Exception {
String name = conf.get(ShuffleServerConf.HYBRID_STORAGE_MANAGER_SELECTOR_CLASS);
Class<?> klass = Class.forName(name);
Constructor<?> constructor =
klass.getConstructor(
StorageManager.class,
StorageManager.class,
AbstractStorageManagerFallbackStrategy.class,
conf.getClass());
StorageManagerSelector instance =
(StorageManagerSelector)
constructor.newInstance(
warmStorageManager, coldStorageManager, storageManagerFallbackStrategy, conf);
return instance;
}
public static AbstractStorageManagerFallbackStrategy loadFallbackStrategy(ShuffleServerConf conf)
throws Exception {
String name =
conf.getString(
ShuffleServerConf.HYBRID_STORAGE_FALLBACK_STRATEGY_CLASS,
HadoopStorageManagerFallbackStrategy.class.getCanonicalName());
Class<?> klass = Class.forName(name);
Constructor<?> constructor;
AbstractStorageManagerFallbackStrategy instance;
try {
constructor = klass.getConstructor(conf.getClass(), Boolean.TYPE);
instance = (AbstractStorageManagerFallbackStrategy) constructor.newInstance(conf);
} catch (NoSuchMethodException e) {
constructor = klass.getConstructor(conf.getClass());
instance = (AbstractStorageManagerFallbackStrategy) constructor.newInstance(conf);
}
return instance;
}
@Override
public void registerRemoteStorage(String appId, RemoteStorageInfo remoteStorageInfo) {
coldStorageManager.registerRemoteStorage(appId, remoteStorageInfo);
}
@Override
public Storage selectStorage(ShuffleDataFlushEvent event) {
return selectStorageManager(event).selectStorage(event);
}
@Override
public Storage selectStorage(ShuffleDataReadEvent event) {
return warmStorageManager.selectStorage(event);
}
@Override
public void updateWriteMetrics(ShuffleDataFlushEvent event, long writeTime) {
throw new UnsupportedOperationException();
}
private StorageManager selectStorageManager(ShuffleDataFlushEvent event) {
StorageManager storageManager = storageManagerSelector.select(event);
return storageManager;
}
@Override
public boolean write(Storage storage, ShuffleWriteHandler handler, ShuffleDataFlushEvent event) {
StorageManager underStorageManager = selectStorageManager(event);
return underStorageManager.write(storage, handler, event);
}
public void start() {}
public void stop() {}
@Override
public Checker getStorageChecker() {
return warmStorageManager.getStorageChecker();
}
@Override
public boolean canWrite(ShuffleDataFlushEvent event) {
return warmStorageManager.canWrite(event) || coldStorageManager.canWrite(event);
}
@Override
public void checkAndClearLeakedShuffleData(Collection<String> appIds) {
warmStorageManager.checkAndClearLeakedShuffleData(appIds);
}
@Override
public Map<String, StorageInfo> getStorageInfo() {
Map<String, StorageInfo> localStorageInfo = warmStorageManager.getStorageInfo();
localStorageInfo.putAll(coldStorageManager.getStorageInfo());
return localStorageInfo;
}
public void removeResources(PurgeEvent event) {
LOG.info("Start to remove resource of {}", event);
warmStorageManager.removeResources(event);
coldStorageManager.removeResources(event);
}
public StorageManager getColdStorageManager() {
return coldStorageManager;
}
public StorageManager getWarmStorageManager() {
return warmStorageManager;
}
}