refact(core): optimized batch removal of remaining indices consumed by a single consumer (#2203)
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraphParams.java b/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraphParams.java
index 0b45f7f..e655b7c 100644
--- a/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraphParams.java
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraphParams.java
@@ -23,6 +23,7 @@
import org.apache.hugegraph.backend.store.ram.RamTable;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
+import org.apache.hugegraph.job.EphemeralJob;
import org.apache.hugegraph.task.ServerInfoManager;
import org.apache.hugegraph.type.define.GraphMode;
import org.apache.hugegraph.type.define.GraphReadMode;
@@ -90,4 +91,6 @@
RateLimiter readRateLimiter();
RamTable ramtable();
+
+ <T> void submitEphemeralJob(EphemeralJob<T> job);
}
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java b/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
index c1d0108..42a6715 100644
--- a/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
@@ -51,6 +51,7 @@
import org.apache.hugegraph.backend.store.raft.RaftBackendStoreProvider;
import org.apache.hugegraph.backend.store.raft.RaftGroupManager;
import org.apache.hugegraph.backend.store.ram.RamTable;
+import org.apache.hugegraph.task.EphemeralJobQueue;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
import org.apache.hugegraph.config.CoreOptions;
@@ -60,6 +61,7 @@
import org.apache.hugegraph.event.EventListener;
import org.apache.hugegraph.exception.NotAllowException;
import org.apache.hugegraph.io.HugeGraphIoRegistry;
+import org.apache.hugegraph.job.EphemeralJob;
import org.apache.hugegraph.masterelection.ClusterRoleStore;
import org.apache.hugegraph.masterelection.Config;
import org.apache.hugegraph.masterelection.RoleElectionConfig;
@@ -1163,6 +1165,7 @@
private class StandardHugeGraphParams implements HugeGraphParams {
private HugeGraph graph = StandardHugeGraph.this;
+ private final EphemeralJobQueue ephemeralJobQueue = new EphemeralJobQueue(this);
private void graph(HugeGraph graph) {
this.graph = graph;
@@ -1304,6 +1307,11 @@
public RamTable ramtable() {
return StandardHugeGraph.this.ramtable;
}
+
+ @Override
+ public <T> void submitEphemeralJob(EphemeralJob<T> job) {
+ this.ephemeralJobQueue.add(job);
+ }
}
private class TinkerPopTransaction extends AbstractThreadLocalTransaction {
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java
index 6cf08f1..7c8d604 100644
--- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java
@@ -164,7 +164,7 @@
try {
LOG.info("Prepare to compress dir '{}' to '{}'", snapshotDir, outputFile);
long begin = System.currentTimeMillis();
- String rootDir = Paths.get(snapshotDir).getParent().toString();
+ String rootDir = Paths.get(snapshotDir).toAbsolutePath().getParent().toString();
String sourceDir = Paths.get(snapshotDir).getFileName().toString();
CompressStrategyManager.getDefault()
.compressZip(rootDir, sourceDir, outputFile, checksum);
@@ -200,7 +200,7 @@
E.checkArgument(this.dataDisks.containsKey(diskTableKey),
"The data path for '%s' should be exist", diskTableKey);
String dataPath = this.dataDisks.get(diskTableKey);
- String parentPath = Paths.get(dataPath).getParent().toString();
+ String parentPath = Paths.get(dataPath).toAbsolutePath().getParent().toString();
String snapshotDir = Paths.get(parentPath, StringUtils.removeEnd(snapshotDirTar, TAR))
.toString();
FileUtils.deleteDirectory(new File(snapshotDir));
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/RpcForwarder.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/RpcForwarder.java
index 389a2ec..b6809e6 100644
--- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/RpcForwarder.java
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/RpcForwarder.java
@@ -77,7 +77,14 @@
public void setResponse(StoreCommandResponse response) {
if (response.getStatus()) {
LOG.debug("StoreCommandResponse status ok");
- future.complete(Status.OK(), () -> null);
+ // This code forwards the request to the Raft leader and considers the operation successful
+ // if it's forwarded successfully. It returns a RaftClosure because the calling
+ // logic expects a RaftClosure result. Specifically, if the current instance is the Raft leader,
+ // it executes the corresponding logic locally and notifies the calling logic asynchronously
+ // via RaftClosure. Therefore, the result is returned as a RaftClosure here.
+ RaftClosure<Status> supplierFuture = new RaftClosure<>();
+ supplierFuture.complete(Status.OK());
+ future.complete(Status.OK(), () -> supplierFuture);
} else {
LOG.debug("StoreCommandResponse status error");
Status status = new Status(RaftError.UNKNOWN,
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java
index f4ff8b3..bc0bc0b 100644
--- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java
@@ -36,6 +36,7 @@
import org.apache.hugegraph.backend.page.PageState;
import org.apache.hugegraph.backend.store.BackendEntry;
import org.apache.hugegraph.backend.store.BackendStore;
+import org.apache.hugegraph.task.EphemeralJobQueue;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator;
@@ -69,7 +70,6 @@
import org.apache.hugegraph.exception.NotSupportException;
import org.apache.hugegraph.iterator.Metadatable;
import org.apache.hugegraph.job.EphemeralJob;
-import org.apache.hugegraph.job.EphemeralJobBuilder;
import org.apache.hugegraph.job.system.DeleteExpiredJob;
import org.apache.hugegraph.perf.PerfUtil.Watched;
import org.apache.hugegraph.schema.IndexLabel;
@@ -81,7 +81,6 @@
import org.apache.hugegraph.structure.HugeIndex.IdWithExpiredTime;
import org.apache.hugegraph.structure.HugeProperty;
import org.apache.hugegraph.structure.HugeVertex;
-import org.apache.hugegraph.task.HugeTask;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.Action;
import org.apache.hugegraph.type.define.HugeKeys;
@@ -115,15 +114,11 @@
conf.get(CoreOptions.QUERY_INDEX_INTERSECT_THRESHOLD);
}
- protected Id asyncRemoveIndexLeft(ConditionQuery query,
- HugeElement element) {
+ protected void asyncRemoveIndexLeft(ConditionQuery query,
+ HugeElement element) {
LOG.info("Remove left index: {}, query: {}", element, query);
RemoveLeftIndexJob job = new RemoveLeftIndexJob(query, element);
- HugeTask<?> task = EphemeralJobBuilder.of(this.graph())
- .name(element.id().asString())
- .job(job)
- .schedule();
- return task.id();
+ this.params().submitEphemeralJob(job);
}
@Watched(prefix = "index")
@@ -1717,7 +1712,8 @@
}
}
- public static class RemoveLeftIndexJob extends EphemeralJob<Object> {
+ public static class RemoveLeftIndexJob extends EphemeralJob<Long>
+ implements EphemeralJobQueue.Reduce<Long> {
private static final String REMOVE_LEFT_INDEX = "remove_left_index";
@@ -1741,7 +1737,7 @@
}
@Override
- public Object execute() {
+ public Long execute() {
this.tx = this.element.schemaLabel().system() ?
this.params().systemTransaction().indexTransaction() :
this.params().graphTransaction().indexTransaction();
@@ -1780,7 +1776,6 @@
// Process secondary index or search index
sCount += this.processSecondaryOrSearchIndexLeft(cq, element);
}
- this.tx.commit();
return rCount + sCount;
}
@@ -1808,7 +1803,6 @@
}
// Remove LeftIndex after constructing remove job
this.query.removeElementLeftIndex(element.id());
- this.tx.commit();
return count;
}
@@ -1873,11 +1867,9 @@
*/
this.tx.updateIndex(il.id(), element, false);
}
- this.tx.commit();
if (this.deletedByError(element, incorrectIndexFields,
incorrectPKs)) {
this.tx.updateIndex(il.id(), deletion, false);
- this.tx.commit();
} else {
count++;
}
@@ -1949,5 +1941,18 @@
return (HugeEdge) QueryResults.one(iter);
}
}
+
+ @Override
+ public Long reduce(Long t1, Long t2) {
+ if (t1 == null) {
+ return t2;
+ }
+
+ if (t2 == null) {
+ return t1;
+ }
+
+ return t1 + t2;
+ }
}
}
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/task/EphemeralJobQueue.java b/hugegraph-core/src/main/java/org/apache/hugegraph/task/EphemeralJobQueue.java
new file mode 100644
index 0000000..70f4907
--- /dev/null
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/task/EphemeralJobQueue.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.task;
+
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hugegraph.HugeGraphParams;
+import org.apache.hugegraph.backend.query.Query;
+import org.apache.hugegraph.backend.tx.GraphTransaction;
+import org.apache.hugegraph.job.EphemeralJob;
+import org.apache.hugegraph.job.EphemeralJobBuilder;
+import org.apache.hugegraph.util.Log;
+import org.slf4j.Logger;
+
+public class EphemeralJobQueue {
+
+ private static final Logger LOG = Log.logger(EphemeralJobQueue.class);
+
+ private static final long CAPACITY = 100 * Query.COMMIT_BATCH;
+
+ private final BlockingQueue<EphemeralJob<?>> pendingQueue;
+
+ private final AtomicReference<State> state;
+
+ private final HugeGraphParams graph;
+
+ private enum State {
+ INIT,
+ EXECUTE,
+ }
+
+ public EphemeralJobQueue(HugeGraphParams graph) {
+ this.state = new AtomicReference<>(State.INIT);
+ this.graph = graph;
+ this.pendingQueue = new ArrayBlockingQueue<>((int) CAPACITY);
+ }
+
+ public boolean add(EphemeralJob<?> job) {
+ if (job == null) {
+ return false;
+ }
+
+ if (!this.pendingQueue.offer(job)) {
+ LOG.warn("The pending queue of EphemeralJobQueue is full, {} job " +
+ "will be ignored", job.type());
+ return false;
+ }
+
+ this.reScheduleIfNeeded();
+ return true;
+ }
+
+ protected HugeGraphParams params() {
+ return this.graph;
+ }
+
+ protected void clear() {
+ this.pendingQueue.clear();
+ }
+
+ protected EphemeralJob<?> poll() {
+ return this.pendingQueue.poll();
+ }
+
+ public void consumeComplete() {
+ this.state.compareAndSet(State.EXECUTE, State.INIT);
+ }
+
+ public void reScheduleIfNeeded() {
+ if (this.state.compareAndSet(State.INIT, State.EXECUTE)) {
+ try {
+ BatchEphemeralJob job = new BatchEphemeralJob(this);
+ EphemeralJobBuilder.of(this.graph.graph())
+ .name("batch-ephemeral-job")
+ .job(job)
+ .schedule();
+ } catch (Throwable e) {
+ // Maybe if it fails, consider clearing all the data in the pendingQueue,
+ // or start a scheduled retry task to retry until success.
+ LOG.warn("Failed to schedule BatchEphemeralJob", e);
+ this.pendingQueue.clear();
+ this.state.compareAndSet(State.EXECUTE, State.INIT);
+ }
+ }
+ }
+
+ public boolean isEmpty() {
+ return this.pendingQueue.isEmpty();
+ }
+
+ public static class BatchEphemeralJob extends EphemeralJob<Object> {
+
+ private static final long PAGE_SIZE = Query.COMMIT_BATCH;
+ private static final String BATCH_EPHEMERAL_JOB = "batch-ephemeral-job";
+ private static final long MAX_CONSUME_COUNT = 2 * PAGE_SIZE;
+
+ private WeakReference<EphemeralJobQueue> queueWeakReference;
+
+ public BatchEphemeralJob(EphemeralJobQueue queue) {
+ this.queueWeakReference = new WeakReference<>(queue);
+ }
+
+ @Override
+ public String type() {
+ return BATCH_EPHEMERAL_JOB;
+ }
+
+ @Override
+ public Object execute() throws Exception {
+ boolean stop = false;
+ Object result = null;
+ int consumeCount = 0;
+ InterruptedException interruptedException = null;
+ EphemeralJobQueue queue;
+ List<EphemeralJob<?>> batchJobs = new ArrayList<>();
+ while (!stop) {
+ if (interruptedException == null && Thread.currentThread().isInterrupted()) {
+ interruptedException = new InterruptedException();
+ }
+
+ queue = this.queueWeakReference.get();
+ if (queue == null) {
+ stop = true;
+ continue;
+ }
+
+ if (queue.isEmpty() || consumeCount > MAX_CONSUME_COUNT ||
+ interruptedException != null) {
+ queue.consumeComplete();
+ stop = true;
+ if (!queue.isEmpty()) {
+ queue.reScheduleIfNeeded();
+ }
+ continue;
+ }
+
+ try {
+ while (!queue.isEmpty() && batchJobs.size() < PAGE_SIZE) {
+ EphemeralJob<?> job = queue.poll();
+ if (job == null) {
+ continue;
+ }
+ batchJobs.add(job);
+ }
+
+ if (batchJobs.isEmpty()) {
+ continue;
+ }
+
+ consumeCount += batchJobs.size();
+ result = this.executeBatchJob(batchJobs, result);
+
+ } catch (InterruptedException e) {
+ interruptedException = e;
+ } finally {
+ batchJobs.clear();
+ }
+ }
+
+ if (interruptedException != null) {
+ Thread.currentThread().interrupt();
+ throw interruptedException;
+ }
+
+ return result;
+ }
+
+ private Object executeBatchJob(List<EphemeralJob<?>> jobs, Object prevResult) throws Exception {
+ GraphTransaction graphTx = this.params().systemTransaction();
+ GraphTransaction systemTx = this.params().graphTransaction();
+ Object result = prevResult;
+ for (EphemeralJob<?> job : jobs) {
+ this.initJob(job);
+ Object obj = job.call();
+ if (job instanceof Reduce) {
+ result = ((Reduce) job).reduce(result, obj);
+ }
+ }
+
+ graphTx.commit();
+ systemTx.commit();
+
+ return result;
+ }
+
+ private void initJob(EphemeralJob<?> job) {
+ job.graph(this.graph());
+ job.params(this.params());
+ }
+
+ @Override
+ public Object call() throws Exception {
+ try {
+ return super.call();
+ } catch (Throwable e) {
+ LOG.warn("Failed to execute BatchEphemeralJob", e);
+ EphemeralJobQueue queue = this.queueWeakReference.get();
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ if (queue != null) {
+ queue.clear();
+ queue.consumeComplete();
+ }
+ throw e;
+ }
+
+ if (queue != null) {
+ queue.consumeComplete();
+ if (!queue.isEmpty()) {
+ queue.reScheduleIfNeeded();
+ }
+ }
+ throw e;
+ }
+ }
+ }
+
+ public interface Reduce<T> {
+ T reduce(T t1, T t2);
+ }
+}
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/util/CompressUtil.java b/hugegraph-core/src/main/java/org/apache/hugegraph/util/CompressUtil.java
index 2a01ba6..0f5c179 100644
--- a/hugegraph-core/src/main/java/org/apache/hugegraph/util/CompressUtil.java
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/util/CompressUtil.java
@@ -145,7 +145,7 @@
Files.createDirectories(newPath);
} else {
// check parent folder again
- Path parent = newPath.getParent();
+ Path parent = newPath.toAbsolutePath().getParent();
if (parent != null) {
if (Files.notExists(parent)) {
Files.createDirectories(parent);
@@ -176,7 +176,7 @@
public static void compressZip(String inputDir, String outputFile,
Checksum checksum) throws IOException {
- String rootDir = Paths.get(inputDir).getParent().toString();
+ String rootDir = Paths.get(inputDir).toAbsolutePath().getParent().toString();
String sourceDir = Paths.get(inputDir).getFileName().toString();
compressZip(rootDir, sourceDir, outputFile, checksum);
}
diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/OpenedRocksDB.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/OpenedRocksDB.java
index 3ae6ba3..91e0287 100644
--- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/OpenedRocksDB.java
+++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/OpenedRocksDB.java
@@ -99,7 +99,7 @@
}
public void createCheckpoint(String targetPath) {
- Path parentName = Paths.get(targetPath).getParent().getFileName();
+ Path parentName = Paths.get(targetPath).toAbsolutePath().getParent().getFileName();
assert parentName.toString().startsWith("snapshot") : targetPath;
// https://github.com/facebook/rocksdb/wiki/Checkpoints
try (Checkpoint checkpoint = Checkpoint.create(this.rocksdb)) {
diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java
index 71a6690..bcbe37b 100644
--- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java
+++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java
@@ -295,9 +295,9 @@
public String buildSnapshotPath(String snapshotPrefix) {
// Like: parent_path/rocksdb-data/*, * can be g,m,s
Path originDataPath = Paths.get(this.dataPath);
- Path parentParentPath = originDataPath.getParent().getParent();
+ Path parentParentPath = originDataPath.toAbsolutePath().getParent().getParent();
// Like: rocksdb-data/*
- Path pureDataPath = parentParentPath.relativize(originDataPath);
+ Path pureDataPath = parentParentPath.relativize(originDataPath.toAbsolutePath());
// Like: parent_path/snapshot_rocksdb-data/*
Path snapshotPath = parentParentPath.resolve(snapshotPrefix + "_" +
pureDataPath);
diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java
index 4158c7d..2dba5fa 100644
--- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java
+++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java
@@ -729,7 +729,7 @@
for (Map.Entry<String, RocksDBSessions> entry : this.dbs.entrySet()) {
// Like: parent_path/rocksdb-data/*, * maybe g,m,s
Path originDataPath = Paths.get(entry.getKey()).toAbsolutePath();
- Path parentParentPath = originDataPath.getParent().getParent();
+ Path parentParentPath = originDataPath.toAbsolutePath().getParent().getParent();
// Like: rocksdb-data/*
Path pureDataPath = parentParentPath.relativize(originDataPath);
// Like: parent_path/snapshot_rocksdb-data/*
@@ -740,7 +740,7 @@
RocksDBSessions sessions = entry.getValue();
sessions.createSnapshot(snapshotPath.toString());
- String snapshotDir = snapshotPath.getParent().toString();
+ String snapshotDir = snapshotPath.toAbsolutePath().getParent().toString();
// Find correspond data HugeType key
String diskTableKey = this.findDiskTableKeyByPath(
entry.getKey());
@@ -781,7 +781,7 @@
if (deleteSnapshot) {
// Delete empty snapshot parent directory
- Path parentPath = Paths.get(snapshotPath).getParent();
+ Path parentPath = Paths.get(snapshotPath).toAbsolutePath().getParent();
if (Files.list(parentPath).count() == 0) {
FileUtils.deleteDirectory(parentPath.toFile());
}
@@ -866,7 +866,7 @@
diskMapping.put(TABLE_GENERAL_KEY, this.dataPath);
for (Map.Entry<HugeType, String> e : this.tableDiskMapping.entrySet()) {
String key = this.store + "/" + e.getKey().name();
- String value = Paths.get(e.getValue()).getParent().toString();
+ String value = Paths.get(e.getValue()).toAbsolutePath().getParent().toString();
diskMapping.put(key, value);
}
return diskMapping;
diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java
index 393cb2e..3d2b7f8 100644
--- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java
+++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java
@@ -108,7 +108,7 @@
Path sstFile = Paths.get(this.dataPath, table,
number + RocksDBIngester.SST);
try {
- FileUtils.forceMkdir(sstFile.getParent().toFile());
+ FileUtils.forceMkdir(sstFile.toAbsolutePath().getParent().toFile());
} catch (IOException e) {
throw new BackendException("Can't make directory for sst: '%s'",
e, sstFile.toString());