| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.uniffle.server.storage; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.ServiceLoader; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentSkipListMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.function.Function; |
| import java.util.stream.Collectors; |
| import java.util.stream.Stream; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import org.apache.commons.collections.CollectionUtils; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.uniffle.common.RemoteStorageInfo; |
| import org.apache.uniffle.common.UnionKey; |
| import org.apache.uniffle.common.exception.RssException; |
| import org.apache.uniffle.common.storage.StorageInfo; |
| import org.apache.uniffle.common.storage.StorageMedia; |
| import org.apache.uniffle.common.storage.StorageStatus; |
| import org.apache.uniffle.common.util.RssUtils; |
| import org.apache.uniffle.common.util.ThreadUtils; |
| import org.apache.uniffle.server.Checker; |
| import org.apache.uniffle.server.LocalStorageChecker; |
| import org.apache.uniffle.server.ShuffleDataFlushEvent; |
| import org.apache.uniffle.server.ShuffleDataReadEvent; |
| import org.apache.uniffle.server.ShuffleServerConf; |
| import org.apache.uniffle.server.ShuffleServerMetrics; |
| import org.apache.uniffle.server.event.AppPurgeEvent; |
| import org.apache.uniffle.server.event.PurgeEvent; |
| import org.apache.uniffle.server.event.ShufflePurgeEvent; |
| import org.apache.uniffle.storage.common.LocalStorage; |
| import org.apache.uniffle.storage.common.Storage; |
| import org.apache.uniffle.storage.common.StorageMediaProvider; |
| import org.apache.uniffle.storage.factory.ShuffleHandlerFactory; |
| import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler; |
| import org.apache.uniffle.storage.request.CreateShuffleDeleteHandlerRequest; |
| import org.apache.uniffle.storage.util.ShuffleStorageUtils; |
| import org.apache.uniffle.storage.util.StorageType; |
| |
| import static org.apache.uniffle.server.ShuffleServerConf.DISK_CAPACITY_WATERMARK_CHECK_ENABLED; |
| import static org.apache.uniffle.server.ShuffleServerConf.LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER; |
| |
| public class LocalStorageManager extends SingleStorageManager { |
| private static final Logger LOG = LoggerFactory.getLogger(LocalStorageManager.class); |
| private static final String UNKNOWN_USER_NAME = "unknown"; |
| |
| private final List<LocalStorage> localStorages; |
| private final List<String> storageBasePaths; |
| private final LocalStorageChecker checker; |
| |
| private final ConcurrentSkipListMap<String, LocalStorage> sortedPartitionsOfStorageMap; |
| private final List<StorageMediaProvider> typeProviders = Lists.newArrayList(); |
| |
| @VisibleForTesting |
| LocalStorageManager(ShuffleServerConf conf) { |
| super(conf); |
| storageBasePaths = RssUtils.getConfiguredLocalDirs(conf); |
| if (CollectionUtils.isEmpty(storageBasePaths)) { |
| throw new IllegalArgumentException("Base path dirs must not be empty"); |
| } |
| this.sortedPartitionsOfStorageMap = new ConcurrentSkipListMap<>(); |
| long capacity = conf.getSizeAsBytes(ShuffleServerConf.DISK_CAPACITY); |
| double ratio = conf.getDouble(ShuffleServerConf.DISK_CAPACITY_RATIO); |
| double highWaterMarkOfWrite = conf.get(ShuffleServerConf.HIGH_WATER_MARK_OF_WRITE); |
| double lowWaterMarkOfWrite = conf.get(ShuffleServerConf.LOW_WATER_MARK_OF_WRITE); |
| if (highWaterMarkOfWrite < lowWaterMarkOfWrite) { |
| throw new IllegalArgumentException( |
| "highWaterMarkOfWrite must be larger than lowWaterMarkOfWrite"); |
| } |
| |
| // We must make sure the order of `storageBasePaths` and `localStorages` is same, or some unit |
| // test may be fail |
| CountDownLatch countDownLatch = new CountDownLatch(storageBasePaths.size()); |
| AtomicInteger successCount = new AtomicInteger(); |
| ServiceLoader<StorageMediaProvider> loader = ServiceLoader.load(StorageMediaProvider.class); |
| for (StorageMediaProvider provider : loader) { |
| provider.init(conf); |
| typeProviders.add(provider); |
| } |
| ExecutorService executorService = ThreadUtils.getDaemonCachedThreadPool("LocalStorage-check"); |
| LocalStorage[] localStorageArray = new LocalStorage[storageBasePaths.size()]; |
| boolean isDiskCapacityWatermarkCheckEnabled = conf.get(DISK_CAPACITY_WATERMARK_CHECK_ENABLED); |
| for (int i = 0; i < storageBasePaths.size(); i++) { |
| final int idx = i; |
| String storagePath = storageBasePaths.get(i); |
| executorService.submit( |
| () -> { |
| try { |
| StorageMedia storageType = getStorageTypeForBasePath(storagePath); |
| LocalStorage.Builder builder = |
| LocalStorage.newBuilder() |
| .basePath(storagePath) |
| .capacity(capacity) |
| .ratio(ratio) |
| .lowWaterMarkOfWrite(lowWaterMarkOfWrite) |
| .highWaterMarkOfWrite(highWaterMarkOfWrite) |
| .localStorageMedia(storageType); |
| if (isDiskCapacityWatermarkCheckEnabled) { |
| builder.enableDiskCapacityWatermarkCheck(); |
| } |
| localStorageArray[idx] = builder.build(); |
| successCount.incrementAndGet(); |
| } catch (Exception e) { |
| LOG.error("LocalStorage init failed!", e); |
| } finally { |
| countDownLatch.countDown(); |
| } |
| }); |
| } |
| |
| try { |
| countDownLatch.await(); |
| } catch (InterruptedException e) { |
| LOG.error("Failed to wait initializing local storage.", e); |
| } |
| executorService.shutdown(); |
| |
| int failedCount = storageBasePaths.size() - successCount.get(); |
| long maxFailedNumber = conf.getLong(LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER); |
| if (failedCount > maxFailedNumber || successCount.get() == 0) { |
| throw new RssException( |
| String.format( |
| "Initialize %s local storage(s) failed, " |
| + "specified local storage paths size: %s, the conf of %s size: %s", |
| failedCount, |
| localStorageArray.length, |
| LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER.key(), |
| maxFailedNumber)); |
| } |
| localStorages = |
| Arrays.stream(localStorageArray).filter(Objects::nonNull).collect(Collectors.toList()); |
| LOG.info( |
| "Succeed to initialize storage paths: {}", |
| StringUtils.join( |
| localStorages.stream().map(LocalStorage::getBasePath).collect(Collectors.toList()))); |
| this.checker = new LocalStorageChecker(conf, localStorages); |
| } |
| |
| private StorageMedia getStorageTypeForBasePath(String basePath) { |
| for (StorageMediaProvider provider : this.typeProviders) { |
| StorageMedia result = provider.getStorageMediaFor(basePath); |
| if (result != StorageMedia.UNKNOWN) { |
| return result; |
| } |
| } |
| return StorageMedia.UNKNOWN; |
| } |
| |
| @Override |
| public Storage selectStorage(ShuffleDataFlushEvent event) { |
| String appId = event.getAppId(); |
| int shuffleId = event.getShuffleId(); |
| int partitionId = event.getStartPartition(); |
| |
| LocalStorage storage = |
| sortedPartitionsOfStorageMap.get(UnionKey.buildKey(appId, shuffleId, partitionId)); |
| if (storage != null) { |
| if (storage.isCorrupted()) { |
| if (storage.containsWriteHandler(appId, shuffleId, partitionId)) { |
| LOG.error( |
| "LocalStorage: {} is corrupted. Switching another storage for event: {}, some data will be lost", |
| storage.getBasePath(), |
| event); |
| } |
| } else { |
| if (event.getUnderStorage() == null) { |
| event.setUnderStorage(storage); |
| } |
| return storage; |
| } |
| } |
| |
| List<LocalStorage> candidates = |
| localStorages.stream() |
| .filter(x -> x.canWrite() && !x.isCorrupted()) |
| .collect(Collectors.toList()); |
| |
| if (candidates.size() == 0) { |
| return null; |
| } |
| final LocalStorage selectedStorage = |
| candidates.get( |
| ShuffleStorageUtils.getStorageIndex(candidates.size(), appId, shuffleId, partitionId)); |
| return sortedPartitionsOfStorageMap.compute( |
| UnionKey.buildKey(appId, shuffleId, partitionId), |
| (key, localStorage) -> { |
| // If this is the first time to select storage or existing storage is corrupted, |
| // we should refresh the cache. |
| if (localStorage == null |
| || localStorage.isCorrupted() |
| || event.getUnderStorage() == null) { |
| event.setUnderStorage(selectedStorage); |
| return selectedStorage; |
| } |
| return localStorage; |
| }); |
| } |
| |
| @Override |
| public Storage selectStorage(ShuffleDataReadEvent event) { |
| String appId = event.getAppId(); |
| int shuffleId = event.getShuffleId(); |
| int partitionId = event.getStartPartition(); |
| |
| return sortedPartitionsOfStorageMap.get(UnionKey.buildKey(appId, shuffleId, partitionId)); |
| } |
| |
| @Override |
| public void updateWriteMetrics(ShuffleDataFlushEvent event, long writeTime) { |
| super.updateWriteMetrics(event, writeTime); |
| ShuffleServerMetrics.counterTotalLocalFileWriteDataSize |
| .labels(ShuffleServerMetrics.LOCAL_DISK_PATH_LABEL_ALL) |
| .inc(event.getSize()); |
| if (event.getUnderStorage() != null) { |
| ShuffleServerMetrics.counterTotalLocalFileWriteDataSize |
| .labels(event.getUnderStorage().getStoragePath()) |
| .inc(event.getSize()); |
| } |
| } |
| |
| @Override |
| public Checker getStorageChecker() { |
| return checker; |
| } |
| |
| @Override |
| public void removeResources(PurgeEvent event) { |
| String appId = event.getAppId(); |
| String user = event.getUser(); |
| List<Integer> shuffleSet = |
| Optional.ofNullable(event.getShuffleIds()).orElse(Collections.emptyList()); |
| |
| // Remove partitions to storage mapping cache |
| cleanupStorageSelectionCache(event); |
| |
| for (LocalStorage storage : localStorages) { |
| if (event instanceof AppPurgeEvent) { |
| storage.removeHandlers(appId); |
| } |
| for (Integer shuffleId : shuffleSet) { |
| storage.removeResources(RssUtils.generateShuffleKey(appId, shuffleId)); |
| } |
| } |
| // delete shuffle data for application |
| ShuffleDeleteHandler deleteHandler = |
| ShuffleHandlerFactory.getInstance() |
| .createShuffleDeleteHandler( |
| new CreateShuffleDeleteHandlerRequest( |
| StorageType.LOCALFILE.name(), new Configuration())); |
| |
| List<String> deletePaths = |
| storageBasePaths.stream() |
| .flatMap( |
| path -> { |
| String basicPath = ShuffleStorageUtils.getFullShuffleDataFolder(path, appId); |
| if (event instanceof ShufflePurgeEvent) { |
| List<String> paths = new ArrayList<>(); |
| for (int shuffleId : shuffleSet) { |
| paths.add( |
| ShuffleStorageUtils.getFullShuffleDataFolder( |
| basicPath, String.valueOf(shuffleId))); |
| } |
| return paths.stream(); |
| } else { |
| return Stream.of(basicPath); |
| } |
| }) |
| .collect(Collectors.toList()); |
| |
| deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]), appId, user); |
| } |
| |
| private void cleanupStorageSelectionCache(PurgeEvent event) { |
| Function<String, Boolean> deleteConditionFunc = null; |
| String prefixKey = null; |
| if (event instanceof AppPurgeEvent) { |
| prefixKey = UnionKey.buildKey(event.getAppId(), ""); |
| deleteConditionFunc = |
| partitionUnionKey -> UnionKey.startsWith(partitionUnionKey, event.getAppId()); |
| } else if (event instanceof ShufflePurgeEvent) { |
| int shuffleId = event.getShuffleIds().get(0); |
| prefixKey = UnionKey.buildKey(event.getAppId(), shuffleId, ""); |
| deleteConditionFunc = |
| partitionUnionKey -> UnionKey.startsWith(partitionUnionKey, event.getAppId(), shuffleId); |
| } |
| if (prefixKey == null) { |
| throw new RssException("Prefix key is null when handles event: " + event); |
| } |
| long startTime = System.currentTimeMillis(); |
| deleteElement(sortedPartitionsOfStorageMap.tailMap(prefixKey), deleteConditionFunc); |
| LOG.info( |
| "Cleaning the storage selection cache costs: {}(ms) for event: {}", |
| System.currentTimeMillis() - startTime, |
| event); |
| } |
| |
| private <K, V> void deleteElement( |
| Map<K, V> sortedPartitionsOfStorageMap, Function<K, Boolean> deleteConditionFunc) { |
| Iterator<Map.Entry<K, V>> iterator = sortedPartitionsOfStorageMap.entrySet().iterator(); |
| while (iterator.hasNext()) { |
| Map.Entry<K, V> entry = iterator.next(); |
| if (deleteConditionFunc.apply(entry.getKey())) { |
| iterator.remove(); |
| } else { |
| break; |
| } |
| } |
| } |
| |
| @Override |
| public void registerRemoteStorage(String appId, RemoteStorageInfo remoteStorageInfo) { |
| // ignore |
| } |
| |
| @Override |
| public void checkAndClearLeakedShuffleData(Collection<String> appIds) { |
| Set<String> appIdsOnStorages = new HashSet<>(); |
| for (LocalStorage localStorage : localStorages) { |
| if (!localStorage.isCorrupted()) { |
| Set<String> appIdsOnStorage = localStorage.getAppIds(); |
| appIdsOnStorages.addAll(appIdsOnStorage); |
| } |
| } |
| |
| for (String appId : appIdsOnStorages) { |
| if (!appIds.contains(appId)) { |
| ShuffleDeleteHandler deleteHandler = |
| ShuffleHandlerFactory.getInstance() |
| .createShuffleDeleteHandler( |
| new CreateShuffleDeleteHandlerRequest( |
| StorageType.LOCALFILE.name(), new Configuration())); |
| String[] deletePaths = new String[storageBasePaths.size()]; |
| for (int i = 0; i < storageBasePaths.size(); i++) { |
| deletePaths[i] = |
| ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePaths.get(i), appId); |
| } |
| deleteHandler.delete(deletePaths, appId, UNKNOWN_USER_NAME); |
| } |
| } |
| } |
| |
| @Override |
| public Map<String, StorageInfo> getStorageInfo() { |
| Map<String, StorageInfo> result = Maps.newHashMap(); |
| for (LocalStorage storage : localStorages) { |
| String mountPoint = storage.getMountPoint(); |
| long capacity = storage.getCapacity(); |
| long wroteBytes = storage.getServiceUsedBytes(); |
| StorageStatus status = StorageStatus.NORMAL; |
| if (storage.isCorrupted()) { |
| status = StorageStatus.UNHEALTHY; |
| } else if (!storage.canWrite()) { |
| status = StorageStatus.OVERUSED; |
| } |
| StorageMedia media = storage.getStorageMedia(); |
| if (media == null) { |
| media = StorageMedia.UNKNOWN; |
| } |
| StorageInfo info = new StorageInfo(mountPoint, media, capacity, wroteBytes, status); |
| result.put(mountPoint, info); |
| } |
| return result; |
| } |
| |
| public List<LocalStorage> getStorages() { |
| return localStorages; |
| } |
| |
| // Only for test. |
| @VisibleForTesting |
| public Map<String, LocalStorage> getSortedPartitionsOfStorageMap() { |
| return sortedPartitionsOfStorageMap; |
| } |
| } |