refact(core): optimized batch removal of remaining indices consumed by a single consumer (#2203)

diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraphParams.java b/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraphParams.java
index 0b45f7f..e655b7c 100644
--- a/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraphParams.java
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/HugeGraphParams.java
@@ -23,6 +23,7 @@
 import org.apache.hugegraph.backend.store.ram.RamTable;
 import org.apache.hugegraph.backend.tx.GraphTransaction;
 import org.apache.hugegraph.backend.tx.SchemaTransaction;
+import org.apache.hugegraph.job.EphemeralJob;
 import org.apache.hugegraph.task.ServerInfoManager;
 import org.apache.hugegraph.type.define.GraphMode;
 import org.apache.hugegraph.type.define.GraphReadMode;
@@ -90,4 +91,6 @@
     RateLimiter readRateLimiter();
 
     RamTable ramtable();
+
+    <T> void submitEphemeralJob(EphemeralJob<T> job);
 }
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java b/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
index c1d0108..42a6715 100644
--- a/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
@@ -51,6 +51,7 @@
 import org.apache.hugegraph.backend.store.raft.RaftBackendStoreProvider;
 import org.apache.hugegraph.backend.store.raft.RaftGroupManager;
 import org.apache.hugegraph.backend.store.ram.RamTable;
+import org.apache.hugegraph.task.EphemeralJobQueue;
 import org.apache.hugegraph.backend.tx.GraphTransaction;
 import org.apache.hugegraph.backend.tx.SchemaTransaction;
 import org.apache.hugegraph.config.CoreOptions;
@@ -60,6 +61,7 @@
 import org.apache.hugegraph.event.EventListener;
 import org.apache.hugegraph.exception.NotAllowException;
 import org.apache.hugegraph.io.HugeGraphIoRegistry;
+import org.apache.hugegraph.job.EphemeralJob;
 import org.apache.hugegraph.masterelection.ClusterRoleStore;
 import org.apache.hugegraph.masterelection.Config;
 import org.apache.hugegraph.masterelection.RoleElectionConfig;
@@ -1163,6 +1165,7 @@
     private class StandardHugeGraphParams implements HugeGraphParams {
 
         private HugeGraph graph = StandardHugeGraph.this;
+        private final EphemeralJobQueue ephemeralJobQueue = new EphemeralJobQueue(this);
 
         private void graph(HugeGraph graph) {
             this.graph = graph;
@@ -1304,6 +1307,11 @@
         public RamTable ramtable() {
             return StandardHugeGraph.this.ramtable;
         }
+
+        @Override
+        public <T> void submitEphemeralJob(EphemeralJob<T> job) {
+            this.ephemeralJobQueue.add(job);
+        }
     }
 
     private class TinkerPopTransaction extends AbstractThreadLocalTransaction {
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java
index 6cf08f1..7c8d604 100644
--- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java
@@ -164,7 +164,7 @@
             try {
                 LOG.info("Prepare to compress dir '{}' to '{}'", snapshotDir, outputFile);
                 long begin = System.currentTimeMillis();
-                String rootDir = Paths.get(snapshotDir).getParent().toString();
+                String rootDir = Paths.get(snapshotDir).toAbsolutePath().getParent().toString();
                 String sourceDir = Paths.get(snapshotDir).getFileName().toString();
                 CompressStrategyManager.getDefault()
                                        .compressZip(rootDir, sourceDir, outputFile, checksum);
@@ -200,7 +200,7 @@
         E.checkArgument(this.dataDisks.containsKey(diskTableKey),
                         "The data path for '%s' should be exist", diskTableKey);
         String dataPath = this.dataDisks.get(diskTableKey);
-        String parentPath = Paths.get(dataPath).getParent().toString();
+        String parentPath = Paths.get(dataPath).toAbsolutePath().getParent().toString();
         String snapshotDir = Paths.get(parentPath, StringUtils.removeEnd(snapshotDirTar, TAR))
                                   .toString();
         FileUtils.deleteDirectory(new File(snapshotDir));
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/RpcForwarder.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/RpcForwarder.java
index 389a2ec..b6809e6 100644
--- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/RpcForwarder.java
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/rpc/RpcForwarder.java
@@ -77,7 +77,14 @@
             public void setResponse(StoreCommandResponse response) {
                 if (response.getStatus()) {
                     LOG.debug("StoreCommandResponse status ok");
-                    future.complete(Status.OK(), () -> null);
+                    // This code forwards the request to the Raft leader and considers the operation successful
+                    // if it's forwarded successfully. It returns a RaftClosure because the calling
+                    // logic expects a RaftClosure result. Specifically, if the current instance is the Raft leader,
+                    // it executes the corresponding logic locally and notifies the calling logic asynchronously
+                    // via RaftClosure. Therefore, the result is returned as a RaftClosure here.
+                    RaftClosure<Status> supplierFuture = new RaftClosure<>();
+                    supplierFuture.complete(Status.OK());
+                    future.complete(Status.OK(), () -> supplierFuture);
                 } else {
                     LOG.debug("StoreCommandResponse status error");
                     Status status = new Status(RaftError.UNKNOWN,
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java
index f4ff8b3..bc0bc0b 100644
--- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphIndexTransaction.java
@@ -36,6 +36,7 @@
 import org.apache.hugegraph.backend.page.PageState;
 import org.apache.hugegraph.backend.store.BackendEntry;
 import org.apache.hugegraph.backend.store.BackendStore;
+import org.apache.hugegraph.task.EphemeralJobQueue;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator;
@@ -69,7 +70,6 @@
 import org.apache.hugegraph.exception.NotSupportException;
 import org.apache.hugegraph.iterator.Metadatable;
 import org.apache.hugegraph.job.EphemeralJob;
-import org.apache.hugegraph.job.EphemeralJobBuilder;
 import org.apache.hugegraph.job.system.DeleteExpiredJob;
 import org.apache.hugegraph.perf.PerfUtil.Watched;
 import org.apache.hugegraph.schema.IndexLabel;
@@ -81,7 +81,6 @@
 import org.apache.hugegraph.structure.HugeIndex.IdWithExpiredTime;
 import org.apache.hugegraph.structure.HugeProperty;
 import org.apache.hugegraph.structure.HugeVertex;
-import org.apache.hugegraph.task.HugeTask;
 import org.apache.hugegraph.type.HugeType;
 import org.apache.hugegraph.type.define.Action;
 import org.apache.hugegraph.type.define.HugeKeys;
@@ -115,15 +114,11 @@
              conf.get(CoreOptions.QUERY_INDEX_INTERSECT_THRESHOLD);
     }
 
-    protected Id asyncRemoveIndexLeft(ConditionQuery query,
-                                      HugeElement element) {
+    protected void asyncRemoveIndexLeft(ConditionQuery query,
+                                        HugeElement element) {
         LOG.info("Remove left index: {}, query: {}", element, query);
         RemoveLeftIndexJob job = new RemoveLeftIndexJob(query, element);
-        HugeTask<?> task = EphemeralJobBuilder.of(this.graph())
-                                              .name(element.id().asString())
-                                              .job(job)
-                                              .schedule();
-        return task.id();
+        this.params().submitEphemeralJob(job);
     }
 
     @Watched(prefix = "index")
@@ -1717,7 +1712,8 @@
         }
     }
 
-    public static class RemoveLeftIndexJob extends EphemeralJob<Object> {
+    public static class RemoveLeftIndexJob extends EphemeralJob<Long>
+                                           implements EphemeralJobQueue.Reduce<Long> {
 
         private static final String REMOVE_LEFT_INDEX = "remove_left_index";
 
@@ -1741,7 +1737,7 @@
         }
 
         @Override
-        public Object execute() {
+        public Long execute() {
             this.tx = this.element.schemaLabel().system() ?
                       this.params().systemTransaction().indexTransaction() :
                       this.params().graphTransaction().indexTransaction();
@@ -1780,7 +1776,6 @@
                 // Process secondary index or search index
                 sCount += this.processSecondaryOrSearchIndexLeft(cq, element);
             }
-            this.tx.commit();
             return rCount + sCount;
         }
 
@@ -1808,7 +1803,6 @@
             }
             // Remove LeftIndex after constructing remove job
             this.query.removeElementLeftIndex(element.id());
-            this.tx.commit();
             return count;
         }
 
@@ -1873,11 +1867,9 @@
                      */
                     this.tx.updateIndex(il.id(), element, false);
                 }
-                this.tx.commit();
                 if (this.deletedByError(element, incorrectIndexFields,
                                         incorrectPKs)) {
                     this.tx.updateIndex(il.id(), deletion, false);
-                    this.tx.commit();
                 } else {
                     count++;
                 }
@@ -1949,5 +1941,18 @@
                 return (HugeEdge) QueryResults.one(iter);
             }
         }
+
+        @Override
+        public Long reduce(Long t1, Long t2) {
+            if (t1 == null) {
+                return t2;
+            }
+
+            if (t2 == null) {
+                return t1;
+            }
+
+            return t1 + t2;
+        }
     }
 }
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/task/EphemeralJobQueue.java b/hugegraph-core/src/main/java/org/apache/hugegraph/task/EphemeralJobQueue.java
new file mode 100644
index 0000000..70f4907
--- /dev/null
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/task/EphemeralJobQueue.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hugegraph.task;
+
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hugegraph.HugeGraphParams;
+import org.apache.hugegraph.backend.query.Query;
+import org.apache.hugegraph.backend.tx.GraphTransaction;
+import org.apache.hugegraph.job.EphemeralJob;
+import org.apache.hugegraph.job.EphemeralJobBuilder;
+import org.apache.hugegraph.util.Log;
+import org.slf4j.Logger;
+
+public class EphemeralJobQueue {
+
+    private static final Logger LOG = Log.logger(EphemeralJobQueue.class);
+
+    private static final long CAPACITY = 100 * Query.COMMIT_BATCH;
+
+    private final BlockingQueue<EphemeralJob<?>> pendingQueue;
+
+    private final AtomicReference<State> state;
+
+    private final HugeGraphParams graph;
+
+    private enum State {
+        INIT,
+        EXECUTE,
+    }
+
+    public EphemeralJobQueue(HugeGraphParams graph) {
+        this.state = new AtomicReference<>(State.INIT);
+        this.graph = graph;
+        this.pendingQueue = new ArrayBlockingQueue<>((int) CAPACITY);
+    }
+
+    public boolean add(EphemeralJob<?> job) {
+        if (job == null) {
+            return false;
+        }
+
+        if (!this.pendingQueue.offer(job)) {
+            LOG.warn("The pending queue of EphemeralJobQueue is full, {} job " +
+                     "will be ignored", job.type());
+            return false;
+        }
+
+        this.reScheduleIfNeeded();
+        return true;
+    }
+
+    protected HugeGraphParams params() {
+        return this.graph;
+    }
+
+    protected void clear() {
+        this.pendingQueue.clear();
+    }
+
+    protected EphemeralJob<?> poll() {
+        return this.pendingQueue.poll();
+    }
+
+    public void consumeComplete() {
+        this.state.compareAndSet(State.EXECUTE, State.INIT);
+    }
+
+    public void reScheduleIfNeeded() {
+        if (this.state.compareAndSet(State.INIT, State.EXECUTE)) {
+            try {
+                BatchEphemeralJob job = new BatchEphemeralJob(this);
+                EphemeralJobBuilder.of(this.graph.graph())
+                                   .name("batch-ephemeral-job")
+                                   .job(job)
+                                   .schedule();
+            } catch (Throwable e) {
+                // Maybe if it fails, consider clearing all the data in the pendingQueue,
+                // or start a scheduled retry task to retry until success.
+                LOG.warn("Failed to schedule BatchEphemeralJob", e);
+                this.pendingQueue.clear();
+                this.state.compareAndSet(State.EXECUTE, State.INIT);
+            }
+        }
+    }
+
+    public boolean isEmpty() {
+        return this.pendingQueue.isEmpty();
+    }
+
+    public static class BatchEphemeralJob extends EphemeralJob<Object> {
+
+        private static final long PAGE_SIZE = Query.COMMIT_BATCH;
+        private static final String BATCH_EPHEMERAL_JOB = "batch-ephemeral-job";
+        private static final long MAX_CONSUME_COUNT = 2 * PAGE_SIZE;
+
+        private WeakReference<EphemeralJobQueue> queueWeakReference;
+
+        public BatchEphemeralJob(EphemeralJobQueue queue) {
+            this.queueWeakReference = new WeakReference<>(queue);
+        }
+
+        @Override
+        public String type() {
+            return BATCH_EPHEMERAL_JOB;
+        }
+
+        @Override
+        public Object execute() throws Exception {
+            boolean stop = false;
+            Object result = null;
+            int consumeCount = 0;
+            InterruptedException interruptedException = null;
+            EphemeralJobQueue queue;
+            List<EphemeralJob<?>> batchJobs = new ArrayList<>();
+            while (!stop) {
+                if (interruptedException == null && Thread.currentThread().isInterrupted()) {
+                    interruptedException = new InterruptedException();
+                }
+
+                queue = this.queueWeakReference.get();
+                if (queue == null) {
+                    stop = true;
+                    continue;
+                }
+
+                if (queue.isEmpty() || consumeCount > MAX_CONSUME_COUNT ||
+                    interruptedException != null) {
+                    queue.consumeComplete();
+                    stop = true;
+                    if (!queue.isEmpty()) {
+                        queue.reScheduleIfNeeded();
+                    }
+                    continue;
+                }
+
+                try {
+                    while (!queue.isEmpty() && batchJobs.size() < PAGE_SIZE) {
+                        EphemeralJob<?> job = queue.poll();
+                        if (job == null) {
+                            continue;
+                        }
+                        batchJobs.add(job);
+                    }
+
+                    if (batchJobs.isEmpty()) {
+                        continue;
+                    }
+
+                    consumeCount += batchJobs.size();
+                    result = this.executeBatchJob(batchJobs, result);
+
+                } catch (InterruptedException e) {
+                    interruptedException = e;
+                } finally {
+                    batchJobs.clear();
+                }
+            }
+
+            if (interruptedException != null) {
+                Thread.currentThread().interrupt();
+                throw interruptedException;
+            }
+
+            return result;
+        }
+
+        private Object executeBatchJob(List<EphemeralJob<?>> jobs, Object prevResult) throws Exception {
+            GraphTransaction graphTx = this.params().systemTransaction();
+            GraphTransaction systemTx = this.params().graphTransaction();
+            Object result = prevResult;
+            for (EphemeralJob<?> job : jobs) {
+                this.initJob(job);
+                Object obj = job.call();
+                if (job instanceof Reduce) {
+                    result = ((Reduce) job).reduce(result, obj);
+                }
+            }
+
+            graphTx.commit();
+            systemTx.commit();
+
+            return result;
+        }
+
+        private void initJob(EphemeralJob<?> job) {
+            job.graph(this.graph());
+            job.params(this.params());
+        }
+
+        @Override
+        public Object call() throws Exception {
+            try {
+                return super.call();
+            } catch (Throwable e) {
+                LOG.warn("Failed to execute BatchEphemeralJob", e);
+                EphemeralJobQueue queue = this.queueWeakReference.get();
+                if (e instanceof InterruptedException) {
+                    Thread.currentThread().interrupt();
+                    if (queue != null) {
+                        queue.clear();
+                        queue.consumeComplete();
+                    }
+                    throw e;
+                }
+
+                if (queue != null) {
+                    queue.consumeComplete();
+                    if (!queue.isEmpty()) {
+                        queue.reScheduleIfNeeded();
+                    }
+                }
+                throw e;
+            }
+        }
+    }
+
+    public interface Reduce<T> {
+        T reduce(T t1,  T t2);
+    }
+}
diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/util/CompressUtil.java b/hugegraph-core/src/main/java/org/apache/hugegraph/util/CompressUtil.java
index 2a01ba6..0f5c179 100644
--- a/hugegraph-core/src/main/java/org/apache/hugegraph/util/CompressUtil.java
+++ b/hugegraph-core/src/main/java/org/apache/hugegraph/util/CompressUtil.java
@@ -145,7 +145,7 @@
                     Files.createDirectories(newPath);
                 } else {
                     // check parent folder again
-                    Path parent = newPath.getParent();
+                    Path parent = newPath.toAbsolutePath().getParent();
                     if (parent != null) {
                         if (Files.notExists(parent)) {
                             Files.createDirectories(parent);
@@ -176,7 +176,7 @@
 
     public static void compressZip(String inputDir, String outputFile,
                                    Checksum checksum) throws IOException {
-        String rootDir = Paths.get(inputDir).getParent().toString();
+        String rootDir = Paths.get(inputDir).toAbsolutePath().getParent().toString();
         String sourceDir = Paths.get(inputDir).getFileName().toString();
         compressZip(rootDir, sourceDir, outputFile, checksum);
     }
diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/OpenedRocksDB.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/OpenedRocksDB.java
index 3ae6ba3..91e0287 100644
--- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/OpenedRocksDB.java
+++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/OpenedRocksDB.java
@@ -99,7 +99,7 @@
     }
 
     public void createCheckpoint(String targetPath) {
-        Path parentName = Paths.get(targetPath).getParent().getFileName();
+        Path parentName = Paths.get(targetPath).toAbsolutePath().getParent().getFileName();
         assert parentName.toString().startsWith("snapshot") : targetPath;
         // https://github.com/facebook/rocksdb/wiki/Checkpoints
         try (Checkpoint checkpoint = Checkpoint.create(this.rocksdb)) {
diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java
index 71a6690..bcbe37b 100644
--- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java
+++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStdSessions.java
@@ -295,9 +295,9 @@
     public String buildSnapshotPath(String snapshotPrefix) {
         // Like: parent_path/rocksdb-data/*, * can be g,m,s
         Path originDataPath = Paths.get(this.dataPath);
-        Path parentParentPath = originDataPath.getParent().getParent();
+        Path parentParentPath = originDataPath.toAbsolutePath().getParent().getParent();
         // Like: rocksdb-data/*
-        Path pureDataPath = parentParentPath.relativize(originDataPath);
+        Path pureDataPath = parentParentPath.relativize(originDataPath.toAbsolutePath());
         // Like: parent_path/snapshot_rocksdb-data/*
         Path snapshotPath = parentParentPath.resolve(snapshotPrefix + "_" +
                                                      pureDataPath);
diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java
index 4158c7d..2dba5fa 100644
--- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java
+++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBStore.java
@@ -729,7 +729,7 @@
             for (Map.Entry<String, RocksDBSessions> entry : this.dbs.entrySet()) {
                 // Like: parent_path/rocksdb-data/*, * maybe g,m,s
                 Path originDataPath = Paths.get(entry.getKey()).toAbsolutePath();
-                Path parentParentPath = originDataPath.getParent().getParent();
+                Path parentParentPath = originDataPath.toAbsolutePath().getParent().getParent();
                 // Like: rocksdb-data/*
                 Path pureDataPath = parentParentPath.relativize(originDataPath);
                 // Like: parent_path/snapshot_rocksdb-data/*
@@ -740,7 +740,7 @@
                 RocksDBSessions sessions = entry.getValue();
                 sessions.createSnapshot(snapshotPath.toString());
 
-                String snapshotDir = snapshotPath.getParent().toString();
+                String snapshotDir = snapshotPath.toAbsolutePath().getParent().toString();
                 // Find correspond data HugeType key
                 String diskTableKey = this.findDiskTableKeyByPath(
                                       entry.getKey());
@@ -781,7 +781,7 @@
 
                 if (deleteSnapshot) {
                     // Delete empty snapshot parent directory
-                    Path parentPath = Paths.get(snapshotPath).getParent();
+                    Path parentPath = Paths.get(snapshotPath).toAbsolutePath().getParent();
                     if (Files.list(parentPath).count() == 0) {
                         FileUtils.deleteDirectory(parentPath.toFile());
                     }
@@ -866,7 +866,7 @@
         diskMapping.put(TABLE_GENERAL_KEY, this.dataPath);
         for (Map.Entry<HugeType, String> e : this.tableDiskMapping.entrySet()) {
             String key = this.store + "/" + e.getKey().name();
-            String value = Paths.get(e.getValue()).getParent().toString();
+            String value = Paths.get(e.getValue()).toAbsolutePath().getParent().toString();
             diskMapping.put(key, value);
         }
         return diskMapping;
diff --git a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java
index 393cb2e..3d2b7f8 100644
--- a/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java
+++ b/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdbsst/RocksDBSstSessions.java
@@ -108,7 +108,7 @@
         Path sstFile = Paths.get(this.dataPath, table,
                                  number + RocksDBIngester.SST);
         try {
-            FileUtils.forceMkdir(sstFile.getParent().toFile());
+            FileUtils.forceMkdir(sstFile.toAbsolutePath().getParent().toFile());
         } catch (IOException e) {
             throw new BackendException("Can't make directory for sst: '%s'",
                                        e, sstFile.toString());