blob: 7561709736abff7c33d4d3fcae0657322823b969 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.uniffle.storage.common;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.storage.StorageMedia;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.storage.handler.api.ServerReadHandler;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
import org.apache.uniffle.storage.handler.impl.LocalFileServerReadHandler;
import org.apache.uniffle.storage.handler.impl.LocalFileWriteHandler;
import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
import org.apache.uniffle.storage.request.CreateShuffleWriteHandlerRequest;
public class LocalStorage extends AbstractStorage {
private static final Logger LOG = LoggerFactory.getLogger(LocalStorage.class);
public static final String STORAGE_HOST = "local";
private final long diskCapacity;
private volatile long diskAvailableBytes;
private volatile long serviceUsedBytes;
// for test cases
private boolean enableDiskCapacityCheck = false;
private long capacity;
private final String basePath;
private final String mountPoint;
private final double highWaterMarkOfWrite;
private final double lowWaterMarkOfWrite;
private final LocalStorageMeta metaData = new LocalStorageMeta();
private final StorageMedia media;
private boolean isSpaceEnough = true;
private volatile boolean isCorrupted = false;
private LocalStorage(Builder builder) {
this.basePath = builder.basePath;
this.highWaterMarkOfWrite = builder.highWaterMarkOfWrite;
this.lowWaterMarkOfWrite = builder.lowWaterMarkOfWrite;
this.capacity = builder.capacity;
this.media = builder.media;
this.enableDiskCapacityCheck = builder.enableDiskCapacityWatermarkCheck;
File baseFolder = new File(basePath);
try {
// similar to mkdir -p, ensure the base folder is a dir
FileUtils.forceMkdir(baseFolder);
// clean the directory if it's data left from previous ran.
FileUtils.cleanDirectory(baseFolder);
FileStore store = Files.getFileStore(baseFolder.toPath());
this.mountPoint = store.name();
} catch (IOException ioe) {
LOG.warn("Init base directory " + basePath + " fail, the disk should be corrupted", ioe);
throw new RssException(ioe);
}
this.diskCapacity = baseFolder.getTotalSpace();
this.diskAvailableBytes = baseFolder.getUsableSpace();
if (capacity < 0L) {
this.capacity = (long) (diskCapacity * builder.ratio);
LOG.info(
"The `rss.server.disk.capacity` is not specified nor negative, the "
+ "ratio(`rss.server.disk.capacity.ratio`:{}) * disk space({}) is used, ",
builder.ratio,
diskCapacity);
} else {
final long freeSpace = diskAvailableBytes;
if (freeSpace < capacity) {
throw new IllegalArgumentException(
"The Disk of "
+ basePath
+ " Available Capacity "
+ freeSpace
+ " is smaller than configuration");
}
}
}
@Override
public String getStoragePath() {
return basePath;
}
@Override
public String getStorageHost() {
return STORAGE_HOST;
}
@Override
public void updateWriteMetrics(StorageWriteMetrics metrics) {
updateWrite(
RssUtils.generateShuffleKey(metrics.getAppId(), metrics.getShuffleId()),
metrics.getDataSize(),
metrics.getPartitions());
}
@Override
public void updateReadMetrics(StorageReadMetrics metrics) {
String shuffleKey = RssUtils.generateShuffleKey(metrics.getAppId(), metrics.getShuffleId());
prepareStartRead(shuffleKey);
updateShuffleLastReadTs(shuffleKey);
}
@Override
ShuffleWriteHandler newWriteHandler(CreateShuffleWriteHandlerRequest request) {
return new LocalFileWriteHandler(
request.getAppId(),
request.getShuffleId(),
request.getStartPartition(),
request.getEndPartition(),
basePath,
request.getFileNamePrefix());
}
@Override
protected ServerReadHandler newReadHandler(CreateShuffleReadHandlerRequest request) {
return new LocalFileServerReadHandler(
request.getAppId(),
request.getShuffleId(),
request.getPartitionId(),
request.getPartitionNumPerRange(),
request.getPartitionNum(),
basePath);
}
// only for tests.
@VisibleForTesting
public void enableDiskCapacityCheck() {
this.enableDiskCapacityCheck = true;
}
public long getDiskCapacity() {
return diskCapacity;
}
@Override
public boolean canWrite() {
boolean serviceUsedCapacityCheck;
boolean diskUsedCapacityCheck;
if (isSpaceEnough) {
serviceUsedCapacityCheck =
(double) (serviceUsedBytes * 100) / capacity < highWaterMarkOfWrite;
diskUsedCapacityCheck =
((double) (diskCapacity - diskAvailableBytes)) * 100 / diskCapacity
< highWaterMarkOfWrite;
} else {
serviceUsedCapacityCheck = (double) (serviceUsedBytes * 100) / capacity < lowWaterMarkOfWrite;
diskUsedCapacityCheck =
((double) (diskCapacity - diskAvailableBytes)) * 100 / diskCapacity < lowWaterMarkOfWrite;
}
isSpaceEnough =
serviceUsedCapacityCheck && (enableDiskCapacityCheck ? diskUsedCapacityCheck : true);
return isSpaceEnough && !isCorrupted;
}
public String getBasePath() {
return basePath;
}
public void createMetadataIfNotExist(String shuffleKey) {
metaData.createMetadataIfNotExist(shuffleKey);
}
public void updateWrite(String shuffleKey, long delta, List<Integer> partitionList) {
metaData.updateDiskSize(delta);
metaData.addShufflePartitionList(shuffleKey, partitionList);
metaData.updateShuffleSize(shuffleKey, delta);
}
public void prepareStartRead(String key) {
metaData.prepareStartRead(key);
}
public void updateShuffleLastReadTs(String shuffleKey) {
metaData.updateShuffleLastReadTs(shuffleKey);
}
@VisibleForTesting
public LocalStorageMeta getMetaData() {
return metaData;
}
public long getCapacity() {
return capacity;
}
public String getMountPoint() {
return mountPoint;
}
public StorageMedia getStorageMedia() {
return media;
}
// This is the only place to remove shuffle metadata, clean and gc thread may remove
// the shuffle metadata concurrently or serially. Force uploader thread may update the
// shuffle size so gc thread must acquire write lock before updating disk size, and force
// uploader thread will not get the lock if the shuffle is removed by gc thread, so
// add the shuffle key back to the expiredShuffleKeys if get lock but fail to acquire write lock.
public void removeResources(String shuffleKey) {
LOG.info("Start to remove resource of {}", shuffleKey);
try {
metaData.updateDiskSize(-metaData.getShuffleSize(shuffleKey));
metaData.remoteShuffle(shuffleKey);
LOG.info(
"Finish remove resource of {}, disk size is {} and {} shuffle metadata",
shuffleKey,
metaData.getDiskSize(),
metaData.getShuffleMetaSet().size());
} catch (Exception e) {
LOG.error("Fail to update disk size", e);
}
}
public boolean isCorrupted() {
return isCorrupted;
}
public void markCorrupted() {
isCorrupted = true;
}
public Set<String> getAppIds() {
Set<String> appIds = new HashSet<>();
File baseFolder = new File(basePath);
File[] files = baseFolder.listFiles();
if (files != null) {
for (File file : files) {
if (file.isDirectory() && !file.isHidden()) {
appIds.add(file.getName());
}
}
}
return appIds;
}
public void updateDiskAvailableBytes(long bytes) {
this.diskAvailableBytes = bytes;
}
public void updateServiceUsedBytes(long usedBytes) {
this.serviceUsedBytes = usedBytes;
}
public long getServiceUsedBytes() {
return serviceUsedBytes;
}
// Only for test
@VisibleForTesting
public void markSpaceFull() {
isSpaceEnough = false;
}
public static class Builder {
private long capacity;
private double ratio;
private double lowWaterMarkOfWrite;
private double highWaterMarkOfWrite;
private String basePath;
private StorageMedia media;
private boolean enableDiskCapacityWatermarkCheck;
private Builder() {}
public Builder capacity(long capacity) {
this.capacity = capacity;
return this;
}
public Builder ratio(double ratio) {
this.ratio = ratio;
return this;
}
public Builder lowWaterMarkOfWrite(double lowWaterMarkOfWrite) {
this.lowWaterMarkOfWrite = lowWaterMarkOfWrite;
return this;
}
public Builder basePath(String basePath) {
this.basePath = basePath;
return this;
}
public Builder highWaterMarkOfWrite(double highWaterMarkOfWrite) {
this.highWaterMarkOfWrite = highWaterMarkOfWrite;
return this;
}
public Builder localStorageMedia(StorageMedia media) {
this.media = media;
return this;
}
public Builder enableDiskCapacityWatermarkCheck() {
this.enableDiskCapacityWatermarkCheck = true;
return this;
}
public LocalStorage build() {
return new LocalStorage(this);
}
}
public static Builder newBuilder() {
return new Builder();
}
}