blob: ceab594bcce4a0c6211a38288e117a2771b5799e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.examples.filestore;
import org.apache.ratis.examples.filestore.FileInfo.ReadOnly;
import org.apache.ratis.examples.filestore.FileInfo.UnderConstruction;
import org.apache.ratis.proto.ExamplesProtos.ReadReplyProto;
import org.apache.ratis.proto.ExamplesProtos.WriteReplyProto;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
import org.apache.ratis.util.StringUtils;
import org.apache.ratis.util.function.CheckedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.function.Supplier;
public class FileStore implements Closeable {
public static final Logger LOG = LoggerFactory.getLogger(FileStore.class);
static class FileMap {
private final Object name;
private final Map<Path, FileInfo> map = new ConcurrentHashMap<>();
FileMap(Supplier<String> name) {
this.name = StringUtils.stringSupplierAsObject(name);
}
FileInfo get(String relative) throws FileNotFoundException {
return applyFunction(relative, map::get);
}
FileInfo remove(String relative) throws FileNotFoundException {
LOG.trace("{}: remove {}", name, relative);
return applyFunction(relative, map::remove);
}
private FileInfo applyFunction(String relative, Function<Path, FileInfo> f)
throws FileNotFoundException {
final FileInfo info = f.apply(normalize(relative));
if (info == null) {
throw new FileNotFoundException("File " + relative + " not found in " + name);
}
return info;
}
void putNew(UnderConstruction uc) {
LOG.trace("{}: putNew {}", name, uc.getRelativePath());
CollectionUtils.putNew(uc.getRelativePath(), uc, map, name::toString);
}
ReadOnly close(UnderConstruction uc) {
LOG.trace("{}: close {}", name, uc.getRelativePath());
final ReadOnly ro = new ReadOnly(uc);
CollectionUtils.replaceExisting(uc.getRelativePath(), uc, ro, map, name::toString);
return ro;
}
}
private final Supplier<RaftPeerId> idSupplier;
private final Supplier<Path> rootSupplier;
private final FileMap files;
private final ExecutorService writer = Executors.newFixedThreadPool(10);
private final ExecutorService committer = Executors.newFixedThreadPool(3);
private final ExecutorService reader = Executors.newFixedThreadPool(10);
private final ExecutorService deleter = Executors.newFixedThreadPool(3);
public FileStore(Supplier<RaftPeerId> idSupplier, Path dir) {
this.idSupplier = idSupplier;
this.rootSupplier = JavaUtils.memoize(
() -> dir.resolve(getId().toString()).normalize().toAbsolutePath());
this.files = new FileMap(JavaUtils.memoize(() -> idSupplier.get() + ":files"));
}
public RaftPeerId getId() {
return Objects.requireNonNull(idSupplier.get(), getClass().getSimpleName() + " is not initialized.");
}
public Path getRoot() {
return rootSupplier.get();
}
static Path normalize(String path) {
Objects.requireNonNull(path, "path == null");
return Paths.get(path).normalize();
}
Path resolve(Path relative) throws IOException {
final Path root = getRoot();
final Path full = root.resolve(relative).normalize().toAbsolutePath();
if (full.equals(root)) {
throw new IOException("The file path " + relative + " resolved to " + full
+ " is the root directory " + root);
} else if (!full.startsWith(root)) {
throw new IOException("The file path " + relative + " resolved to " + full
+ " is not a sub-path under root directory " + root);
}
return full;
}
CompletableFuture<ReadReplyProto> read(String relative, long offset, long length) {
final Supplier<String> name = () -> "read(" + relative
+ ", " + offset + ", " + length + ") @" + getId();
final CheckedSupplier<ReadReplyProto, IOException> task = LogUtils.newCheckedSupplier(LOG, () -> {
final FileInfo info = files.get(relative);
final ReadReplyProto.Builder reply = ReadReplyProto.newBuilder()
.setResolvedPath(FileStoreCommon.toByteString(info.getRelativePath()))
.setOffset(offset);
final ByteString bytes = info.read(this::resolve, offset, length);
return reply.setData(bytes).build();
}, name);
return submit(task, reader);
}
CompletableFuture<Path> delete(long index, String relative) {
final Supplier<String> name = () -> "delete(" + relative + ") @" + getId() + ":" + index;
final CheckedSupplier<Path, IOException> task = LogUtils.newCheckedSupplier(LOG, () -> {
final FileInfo info = files.remove(relative);
FileUtils.delete(resolve(info.getRelativePath()));
return info.getRelativePath();
}, name);
return submit(task, deleter);
}
static <T> CompletableFuture<T> submit(
CheckedSupplier<T, IOException> task, ExecutorService executor) {
final CompletableFuture<T> f = new CompletableFuture<>();
executor.submit(() -> {
try {
f.complete(task.get());
} catch (IOException e) {
f.completeExceptionally(new IOException("Failed " + task, e));
}
});
return f;
}
CompletableFuture<WriteReplyProto> submitCommit(
long index, String relative, boolean close, long offset, int size) {
final Function<UnderConstruction, ReadOnly> converter = close ? files::close: null;
final UnderConstruction uc;
try {
uc = files.get(relative).asUnderConstruction();
} catch (FileNotFoundException e) {
return FileStoreCommon.completeExceptionally(
index, "Failed to write to " + relative, e);
}
return uc.submitCommit(offset, size, converter, committer, getId(), index)
.thenApply(n -> WriteReplyProto.newBuilder()
.setResolvedPath(FileStoreCommon.toByteString(uc.getRelativePath()))
.setOffset(offset)
.setLength(n)
.build());
}
CompletableFuture<Integer> write(
long index, String relative, boolean close, long offset, ByteString data) {
final int size = data != null? data.size(): 0;
LOG.trace("write {}, offset={}, size={}, close? {} @{}:{}",
relative, offset, size, close, getId(), index);
final boolean createNew = offset == 0L;
final UnderConstruction uc;
if (createNew) {
uc = new UnderConstruction(normalize(relative));
files.putNew(uc);
} else {
try {
uc = files.get(relative).asUnderConstruction();
} catch (FileNotFoundException e) {
return FileStoreCommon.completeExceptionally(
index, "Failed to write to " + relative, e);
}
}
return size == 0 && !close? CompletableFuture.completedFuture(0)
: createNew? uc.submitCreate(this::resolve, data, close, writer, getId(), index)
: uc.submitWrite(offset, data, close, writer, getId(), index);
}
@Override
public void close() {
writer.shutdownNow();
committer.shutdownNow();
reader.shutdownNow();
deleter.shutdownNow();
}
}