Cache LevelDB Snapshot reference.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index b3c6083..ca2d209 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -42,6 +42,7 @@
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.hdfsdb.Options;
+import org.apache.hadoop.hdfs.hdfsdb.Snapshot;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
@@ -74,6 +75,7 @@
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
@@ -186,11 +188,36 @@
   private INodeAttributeProvider attributeProvider;
   private final boolean enableLevelDb;
   private final org.apache.hadoop.hdfs.hdfsdb.DB levelDb;
+  private AtomicReference<Snapshot> currentSnapshot = new AtomicReference<>();
 
   org.apache.hadoop.hdfs.hdfsdb.DB getLevelDb() {
     return levelDb;
   }
 
+  Snapshot currentLevelDbSnapshot() {
+    assert hasReadLock();
+    if (currentSnapshot.get() == null) {
+      synchronized (currentSnapshot) {
+        if (currentSnapshot.get() == null) {
+          currentSnapshot.set(levelDb.snapshot());
+        }
+      }
+    }
+    return currentSnapshot.get();
+  }
+
+  void clearCurrentLevelDBSnapshot() {
+    assert hasWriteLock();
+    Snapshot s = currentSnapshot.get();
+    if (s != null) {
+      try {
+        s.close();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      currentSnapshot.set(null);
+    }
+  }
   public void setINodeAttributeProvider(INodeAttributeProvider provider) {
     attributeProvider = provider;
   }
@@ -438,7 +465,23 @@
    * Shutdown the filestore
    */
   @Override
-  public void close() throws IOException {}
+  public void close() throws IOException {
+    if (enableLevelDb) {
+      writeLock();
+      try {
+        if (currentSnapshot.get() != null) {
+          try {
+            currentSnapshot.get().close();
+          } catch (Exception e) {
+            throw new IOException(e);
+          }
+        }
+        levelDb.close();
+      } finally {
+        writeUnlock();
+      }
+    }
+  }
 
   void markNameCacheInitialized() {
     writeLock();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBROTransaction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBROTransaction.java
index 50d8c30..174443a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBROTransaction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBROTransaction.java
@@ -30,7 +30,6 @@
 class LevelDBROTransaction extends ROTransaction {
   private final org.apache.hadoop.hdfs.hdfsdb.DB hdfsdb;
 
-  private Snapshot snapshot;
   private final ReadOptions options = new ReadOptions();
   public static final ReadOptions OPTIONS = new ReadOptions();
 
@@ -40,8 +39,8 @@
   }
 
   LevelDBROTransaction begin() {
-    snapshot = hdfsdb.snapshot();
-    options.snapshot(snapshot);
+    fsd.readLock();
+    options.snapshot(fsd.currentLevelDbSnapshot());
     return this;
   }
 
@@ -137,10 +136,6 @@
 
   @Override
   public void close() throws IOException {
-    try {
-      snapshot.close();
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
+    fsd.readUnlock();
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBRWTransaction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBRWTransaction.java
index 4c8a2d5..10ebc77 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBRWTransaction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBRWTransaction.java
@@ -80,6 +80,7 @@
   void commit() {
     try {
       hdfsdb.write(WRITE_OPTIONS, batch);
+      fsd.clearCurrentLevelDBSnapshot();
     } catch (IOException e) {
       throw new RuntimeException(e);
     }