WIP Various fixes and performance improvements.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 0e50d8c..b3c6083 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -348,7 +348,17 @@
     this.enableLevelDb = conf.getBoolean("dfs.partialns", false);
     if (enableLevelDb) {
       String dbPath = conf.get("dfs.partialns.path");
-      Options options = new Options().createIfMissing(true);
+      int writeBufferSize = conf.getInt("dfs.partialns.writebuffer",
+                                        4096 * 1024);
+      long blockCacheSize = conf.getLong(
+          "dfs.partialns.blockcache", 0);
+      Options options = new Options().createIfMissing(true)
+          .writeBufferSize(writeBufferSize);
+
+      if (blockCacheSize != 0) {
+        options.blockCacheSize(blockCacheSize);
+      }
+
       this.levelDb = org.apache.hadoop.hdfs.hdfsdb.DB.open(options, dbPath);
       try (RWTransaction tx = newRWTransaction().begin()) {
         tx.putINode(ROOT_INODE_ID, createRootForFlatNS(ns));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBROTransaction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBROTransaction.java
index f55ed63..50d8c30 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBROTransaction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBROTransaction.java
@@ -29,37 +29,43 @@
 
 class LevelDBROTransaction extends ROTransaction {
   private final org.apache.hadoop.hdfs.hdfsdb.DB hdfsdb;
-  private static final ReadOptions OPTIONS = new ReadOptions();
+
+  private Snapshot snapshot;
+  private final ReadOptions options = new ReadOptions();
+  public static final ReadOptions OPTIONS = new ReadOptions();
+
   LevelDBROTransaction(FSDirectory fsd, org.apache.hadoop.hdfs.hdfsdb.DB db) {
     super(fsd);
     this.hdfsdb = db;
   }
 
   LevelDBROTransaction begin() {
-    fsd.readLock();
+    snapshot = hdfsdb.snapshot();
+    options.snapshot(snapshot);
     return this;
   }
 
   @Override
   FlatINode getINode(long id) {
-    return getFlatINode(id, hdfsdb);
+    return getFlatINode(id, hdfsdb, options);
   }
 
   @Override
   long getChild(long parentId, ByteBuffer localName) {
-    return getChild(parentId, localName, hdfsdb);
+    return getChild(parentId, localName, hdfsdb, options);
   }
 
   @Override
   DBChildrenView childrenView(long parent) {
-    return getChildrenView(parent, hdfsdb);
+    return getChildrenView(parent, hdfsdb, options);
   }
 
   static FlatINode getFlatINode(
-      long id, org.apache.hadoop.hdfs.hdfsdb.DB hdfsdb) {
+      long id, DB hdfsdb, ReadOptions options) {
     byte[] key = inodeKey(id);
     try {
-      byte[] bytes = hdfsdb.get(OPTIONS, key);
+      byte[] bytes = options == OPTIONS ? hdfsdb.get(options, key) : hdfsdb
+          .snapshotGet(options, key);
       if (bytes == null) {
         return null;
       }
@@ -83,11 +89,13 @@
       };
   }
 
-  static long getChild(long parentId, ByteBuffer localName, DB hdfsdb) {
+  static long getChild(
+      long parentId, ByteBuffer localName, DB hdfsdb, ReadOptions options) {
     Preconditions.checkArgument(localName.hasRemaining());
     byte[] key = inodeChildKey(parentId, localName);
     try {
-      byte[] bytes = hdfsdb.get(OPTIONS, key);
+      byte[] bytes = options == OPTIONS ? hdfsdb.get(options, key) : hdfsdb
+          .snapshotGet(options, key);
       if (bytes == null) {
         return INVALID_INODE_ID;
       }
@@ -109,7 +117,8 @@
     return key;
   }
 
-  static DBChildrenView getChildrenView(long parent, DB hdfsdb) {
+  static DBChildrenView getChildrenView(
+      long parent, DB hdfsdb, ReadOptions options) {
     byte[] key = new byte[]{'I',
         (byte) ((parent >> 56) & 0xff),
         (byte) ((parent >> 48) & 0xff),
@@ -121,9 +130,17 @@
         (byte) (parent & 0xff),
         1
     };
-    Iterator it = hdfsdb.iterator(OPTIONS);
+    Iterator it = hdfsdb.iterator(options);
     it.seek(key);
     return new LevelDBChildrenView(parent, it);
   }
 
+  @Override
+  public void close() throws IOException {
+    try {
+      snapshot.close();
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBRWTransaction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBRWTransaction.java
index 3f14cff..4c8a2d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBRWTransaction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBRWTransaction.java
@@ -37,17 +37,20 @@
 
   @Override
   FlatINode getINode(long id) {
-    return LevelDBROTransaction.getFlatINode(id, hdfsdb);
+    return LevelDBROTransaction.getFlatINode(id, hdfsdb,
+                                             LevelDBROTransaction.OPTIONS);
   }
 
   @Override
   long getChild(long parentId, ByteBuffer localName) {
-    return LevelDBROTransaction.getChild(parentId, localName, hdfsdb);
+    return LevelDBROTransaction.getChild(parentId, localName, hdfsdb,
+                                         LevelDBROTransaction.OPTIONS);
   }
 
   @Override
   DBChildrenView childrenView(long parent) {
-    return LevelDBROTransaction.getChildrenView(parent, hdfsdb);
+    return LevelDBROTransaction.getChildrenView(parent, hdfsdb,
+                                                LevelDBROTransaction.OPTIONS);
   }
 
   @Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBReplayTransaction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBReplayTransaction.java
index e1b8eff..d486010 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBReplayTransaction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBReplayTransaction.java
@@ -38,17 +38,20 @@
 
   @Override
   FlatINode getINode(long id) {
-    return LevelDBROTransaction.getFlatINode(id, hdfsdb);
+    return LevelDBROTransaction.getFlatINode(id, hdfsdb,
+                                             LevelDBROTransaction.OPTIONS);
   }
 
   @Override
   long getChild(long parentId, ByteBuffer localName) {
-    return LevelDBROTransaction.getChild(parentId, localName, hdfsdb);
+    return LevelDBROTransaction.getChild(parentId, localName, hdfsdb,
+                                         LevelDBROTransaction.OPTIONS);
   }
 
   @Override
   DBChildrenView childrenView(long parent) {
-    return LevelDBROTransaction.getChildrenView(parent, hdfsdb);
+    return LevelDBROTransaction.getChildrenView(parent, hdfsdb,
+                                                LevelDBROTransaction.OPTIONS);
   }
 
   @Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/MemDBChildrenView.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/MemDBChildrenView.java
index 3278111..743bf0b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/MemDBChildrenView.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/MemDBChildrenView.java
@@ -31,6 +31,10 @@
 
   @Override
   public Iterator<Map.Entry<ByteBuffer, Long>> iterator() {
-    return childrenMap.tailMap(start).entrySet().iterator();
+    if (start == null) {
+      return childrenMap.entrySet().iterator();
+    } else {
+      return childrenMap.tailMap(start).entrySet().iterator();
+    }
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/LevelDBProfile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/LevelDBProfile.java
new file mode 100644
index 0000000..51c4dba
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/LevelDBProfile.java
@@ -0,0 +1,83 @@
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.hdfsdb.*;
+import org.apache.log4j.Level;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.util.Time.monotonicNow;
+
+/**
+ * Created by hmai on 6/3/15.
+ */
+public class LevelDBProfile {
+  private static final String DB_PATH = "/Users/hmai/work/test/partialnsdb";
+  private static final int TIMES = 300000;
+  public static void main(String[] args) throws Exception {
+    MiniDFSCluster cluster = null;
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean("dfs.partialns", true);
+    conf.set("dfs.partialns.path", DB_PATH);
+    conf.setInt("dfs.partialns.writebuffer", 8388608 * 16);
+    conf.setLong("dfs.partialns.blockcache", 4294967296L);
+    ExecutorService executor = Executors.newFixedThreadPool(8, new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable r) {
+        return new Thread(r, "Executor");
+      }
+    });
+    ((Log4JLogger)FSNamesystem.auditLog).getLogger().setLevel(Level.WARN);
+
+    try {
+      FileUtils.deleteDirectory(new File(DB_PATH));
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+      final DistributedFileSystem fs = cluster.getFileSystem();
+      final org.apache.hadoop.hdfs.hdfsdb.DB db = cluster.getNamesystem().getFSDirectory().getLevelDb();
+      final Path PATH = new Path("/foo");
+      final byte[] p = new byte[20];
+      try (OutputStream os = fs.create(PATH)) {
+      }
+      cluster.shutdownDataNodes();
+      final FSNamesystem fsn = cluster.getNamesystem();
+      final Runnable getFileStatus = new Runnable() {
+        @Override
+        public void run() {
+          try {
+            fsn.getFileInfo("/foo", true);
+            //fs.getFileStatus(PATH);
+          } catch (IOException e) {
+            e.printStackTrace();
+          }
+        }
+      };
+
+      long start = monotonicNow();
+      for (int i = 0; i < TIMES; ++i) {
+        executor.submit(getFileStatus);
+      }
+      executor.shutdown();
+      executor.awaitTermination(1, TimeUnit.HOURS);
+      long end = monotonicNow();
+      System.err.println("Time: " + (end - start) + " ms");
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/DB.java b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/DB.java
index 0355dcc..57ec71e 100644
--- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/DB.java
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/DB.java
@@ -21,7 +21,8 @@
 
 public class DB extends NativeObject {
   public static DB open(Options options, String path) throws IOException {
-    return new DB(open(options.nativeHandle(), path));
+    long handle = open(options.nativeHandle(), path);
+    return new DB(handle);
   }
 
   @Override
@@ -36,6 +37,11 @@
     return get(nativeHandle, options.nativeHandle(), key);
   }
 
+  public byte[] snapshotGet(ReadOptions options, byte[] key) throws
+                                                             IOException {
+    return snapshotGet(nativeHandle, options.nativeHandle(), key);
+  }
+
   public void write(WriteOptions options, WriteBatch batch) throws IOException {
     write(nativeHandle, options.nativeHandle(), batch.nativeHandle());
   }
@@ -52,6 +58,14 @@
     return new Iterator(newIterator(nativeHandle, options.nativeHandle()));
   }
 
+  public Snapshot snapshot() {
+    return new Snapshot(nativeHandle, newSnapshot(nativeHandle));
+  }
+
+  public byte[] dbGetTest(byte[] key) throws IOException {
+    return getTest(nativeHandle, key);
+  }
+
   private DB(long handle) {
     super(handle);
   }
@@ -60,6 +74,8 @@
   private static native void close(long handle);
   private static native byte[] get(long handle, long options,
                                    byte[] key) throws IOException;
+  private static native byte[] snapshotGet(long handle, long options,
+      byte[] key) throws IOException;
   private static native void write(long handle, long options,
                                    long batch) throws IOException;
   private static native void put(long handle, long options,
@@ -67,4 +83,9 @@
   private static native void delete(long handle, long options,
                                     byte[] key);
   private static native long newIterator(long handle, long readOptions);
+  private static native long newSnapshot(long handle);
+  static native void releaseSnapshot(long handle, long snapshotHandle);
+
+  private static native byte[] getTest(long handle, byte[] key) throws IOException;
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Options.java b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Options.java
index a12da61..626e4d8 100644
--- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Options.java
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Options.java
@@ -56,6 +56,11 @@
     return this;
   }
 
+  public Options blockCacheSize(long capacity) {
+    blockCacheSize(nativeHandle, capacity);
+    return this;
+  }
+
   @Override
   public void close() {
     if (nativeHandle != 0) {
@@ -70,4 +75,5 @@
   private static native void compressionType(long handle, int value);
   private static native void writeBufferSize(long handle, int value);
   private static native void blockSize(long handle, int value);
+  private static native void blockCacheSize(long handle, long capacity);
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/ReadOptions.java b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/ReadOptions.java
index e97e05f..b2e6726 100644
--- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/ReadOptions.java
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/ReadOptions.java
@@ -22,6 +22,11 @@
     super(construct());
   }
 
+  public ReadOptions snapshot(Snapshot snapshot) {
+    snapshot(nativeHandle, snapshot.nativeHandle);
+    return this;
+  }
+
   @Override
   public void close() {
     if (nativeHandle != 0) {
@@ -32,4 +37,5 @@
 
   private static native long construct();
   private static native void destruct(long handle);
+  private static native void snapshot(long handle, long snapshot);
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Snapshot.java b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Snapshot.java
new file mode 100644
index 0000000..ad370b3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Snapshot.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.hdfsdb;
+
+public class Snapshot extends NativeObject {
+  private final long dbHandle;
+  Snapshot(long dbHandle, long nativeHandle) {
+    super(nativeHandle);
+    this.dbHandle = dbHandle;
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (nativeHandle != 0) {
+      DB.releaseSnapshot(dbHandle, nativeHandle);
+      nativeHandle = 0;
+    }
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.cc
index 1225412..2d0bef2 100644
--- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.cc
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.cc
@@ -1121,6 +1121,52 @@
   return s;
 }
 
+Status DBImpl::SnapshotGet(const ReadOptions& options,
+                           const Slice& key,
+                           const std::function<void(const Slice&)> &get_value) {
+  Status s;
+  assert(options.snapshot);
+  SequenceNumber snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
+  LookupKey lkey(key, snapshot);
+
+  //mutex_.Lock();
+  MemTable* mem = mem_;
+  MemTable* imm = imm_;
+  //mutex_.Unlock();
+  mem->Ref();
+  if (imm != NULL) imm->Ref();
+
+  // First look in the memtable, then in the immutable memtable (if any).
+  if (mem->Get(lkey, get_value, &s)) {
+    // Done
+  } else if (imm != NULL && imm->Get(lkey, get_value, &s)) {
+    // Done
+  } else {
+    assert (false);
+    mutex_.Lock();
+    Version* current = versions_->current();
+    current->Ref();
+    // Unlock while reading from files and memtables
+    mutex_.Unlock();
+    Version::GetStats stats;
+    std::string value;
+    s = current->Get(options, lkey, &value, &stats);
+    if (value.size()) {
+      get_value(Slice(value));
+    }
+    mutex_.Lock();
+    if (current->UpdateStats(stats)) {
+      MaybeScheduleCompaction();
+    }
+    current->Unref();
+    mutex_.Unlock();
+  }
+
+  mem->Unref();
+  if (imm != NULL) imm->Unref();
+  return s;
+}
+
 Iterator* DBImpl::NewIterator(const ReadOptions& options) {
   SequenceNumber latest_snapshot;
   uint32_t seed;
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.h b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.h
index cfc9981..21a6c5f 100644
--- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.h
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.h
@@ -35,6 +35,10 @@
   virtual Status Get(const ReadOptions& options,
                      const Slice& key,
                      std::string* value);
+  virtual Status SnapshotGet(const ReadOptions& options,
+                             const Slice& key,
+                             const std::function<void(const Slice&)>
+                             &get_value);
   virtual Iterator* NewIterator(const ReadOptions&);
   virtual const Snapshot* GetSnapshot();
   virtual void ReleaseSnapshot(const Snapshot* snapshot);
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_test.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_test.cc
index 280b01c..3462fe7 100644
--- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_test.cc
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_test.cc
@@ -1848,6 +1848,14 @@
     assert(false);      // Not implemented
     return Status::NotFound(key);
   }
+
+  virtual Status SnapshotGet(const ReadOptions& options,
+                             const Slice& key, const
+                             std::function<void(const Slice&)> &) {
+    assert(false);      // Not implemented
+    return Status::NotFound(key);
+  }
+
   virtual Iterator* NewIterator(const ReadOptions& options) {
     if (options.snapshot == NULL) {
       KVMap* saved = new KVMap;
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.cc
index bfec0a7..9cabcae 100644
--- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.cc
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.cc
@@ -106,6 +106,12 @@
 }
 
 bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
+  return Get(key, [value](const Slice &v) {
+          value->assign(v.data(), v.size()); }, s);
+}
+
+bool MemTable::Get(const LookupKey& key,
+ const std::function<void(const Slice&)> &get_value, Status* s) {
   Slice memkey = key.memtable_key();
   Table::Iterator iter(&table_);
   iter.Seek(memkey.data());
@@ -130,7 +136,7 @@
       switch (static_cast<ValueType>(tag & 0xff)) {
         case kTypeValue: {
           Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
-          value->assign(v.data(), v.size());
+          get_value(v);
           return true;
         }
         case kTypeDeletion:
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.h b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.h
index 92e90bb..31835bf 100644
--- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.h
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.h
@@ -6,6 +6,9 @@
 #define STORAGE_LEVELDB_DB_MEMTABLE_H_
 
 #include <string>
+#include <functional>
+#include <atomic>
+
 #include "leveldb/db.h"
 #include "db/dbformat.h"
 #include "db/skiplist.h"
@@ -28,9 +31,9 @@
 
   // Drop reference count.  Delete if no more references exist.
   void Unref() {
-    --refs_;
-    assert(refs_ >= 0);
-    if (refs_ <= 0) {
+    int v = std::atomic_fetch_sub(&refs_, 1);
+    assert(v >= 0);
+    if (v <= 0) {
       delete this;
     }
   }
@@ -62,6 +65,8 @@
   // in *status and return true.
   // Else, return false.
   bool Get(const LookupKey& key, std::string* value, Status* s);
+  bool Get(const LookupKey& key, const std::function<void(const Slice&)>
+  &get_value, Status* s);
 
  private:
   ~MemTable();  // Private since only Unref() should be used to delete it
@@ -77,7 +82,7 @@
   typedef SkipList<const char*, KeyComparator> Table;
 
   KeyComparator comparator_;
-  int refs_;
+  std::atomic_int refs_;
   Arena arena_;
   Table table_;
 
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/include/leveldb/db.h b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/include/leveldb/db.h
index 40851b2..a81fe1a 100644
--- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/include/leveldb/db.h
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/include/leveldb/db.h
@@ -7,6 +7,8 @@
 
 #include <stdint.h>
 #include <stdio.h>
+#include <functional>
+
 #include "leveldb/iterator.h"
 #include "leveldb/options.h"
 
@@ -83,6 +85,15 @@
   virtual Status Get(const ReadOptions& options,
                      const Slice& key, std::string* value) = 0;
 
+  // Get the value from a particular snapshot. The call only blocks if
+  // the value resides in the block cache or on the disk.
+  //
+  // May return some other Status on an error.
+  virtual Status SnapshotGet(const ReadOptions& options,
+                             const Slice& key,
+                             const std::function<void(const Slice&)>
+                             &get_value) = 0;
+
   // Return a heap-allocated iterator over the contents of the database.
   // The result of NewIterator() is initially invalid (caller must
   // call one of the Seek methods on the iterator before using it).
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/jni/bindings.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/jni/bindings.cc
index 33604b8b..2cae088 100644
--- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/jni/bindings.cc
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/jni/bindings.cc
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 #include <jni.h>
-
+#include <mutex>
 #undef JNIEXPORT
 #if _WIN32
 #define JNIEXPORT __declspec(dllexport)
@@ -33,11 +33,12 @@
 #include "org_apache_hadoop_hdfs_hdfsdb_WriteOptions.h"
 
 #include <leveldb/db.h>
+#include <leveldb/cache.h>
 #include <leveldb/options.h>
 #include <leveldb/write_batch.h>
 #include <leveldb/cache.h>
 
-static inline uintptr_t uintptr(void *ptr) {
+static inline uintptr_t uintptr(const void *ptr) {
   return reinterpret_cast<uintptr_t>(ptr);
 }
 
@@ -130,6 +131,29 @@
   return ToJByteArray(env, leveldb::Slice(result));
 }
 
+jbyteArray JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_snapshotGet(JNIEnv *env, jclass, jlong handle, jlong jread_options, jbyteArray jkey) {
+  leveldb::DB *db = reinterpret_cast<leveldb::DB*>(handle);
+  leveldb::ReadOptions *options = reinterpret_cast<leveldb::ReadOptions*>(jread_options);
+  jbyteArray res = NULL;
+  leveldb::Status status;
+  {
+    JNIByteArrayHolder<GetByteArrayCritical> key(env, jkey);
+    status = db->SnapshotGet(*options, key.slice(),
+     [env,&res](const leveldb::Slice &v) {
+      res = ToJByteArray(env, v);
+     });
+  }
+
+  if (status.IsNotFound()) {
+    return NULL;
+  } else if (!status.ok()) {
+    env->ThrowNew(env->FindClass("java/io/IOException"), status.ToString().c_str());
+    return NULL;
+  }
+
+  return res;
+}
+
 void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_write(JNIEnv *env, jclass, jlong handle, jlong jwrite_options, jlong jbatch) {
   leveldb::DB *db = reinterpret_cast<leveldb::DB*>(handle);
   leveldb::WriteOptions *options = reinterpret_cast<leveldb::WriteOptions*>(jwrite_options);
@@ -150,10 +174,33 @@
 jlong JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_newIterator(JNIEnv *, jclass, jlong handle, jlong jread_options) {
   leveldb::DB *db = reinterpret_cast<leveldb::DB*>(handle);
   leveldb::ReadOptions *options = reinterpret_cast<leveldb::ReadOptions*>(jread_options);
-  auto res = uintptr(db->NewIterator(*options));
+  uintptr_t res = uintptr(db->NewIterator(*options));
   return res;
 }
 
+jlong JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_newSnapshot(JNIEnv *, jclass, jlong handle) {
+  leveldb::DB *db = reinterpret_cast<leveldb::DB*>(handle);
+  uintptr_t res = uintptr(db->GetSnapshot());
+  return res;
+}
+
+void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_releaseSnapshot(JNIEnv *, jclass, jlong handle, jlong snapshot) {
+  leveldb::DB *db = reinterpret_cast<leveldb::DB*>(handle);
+  leveldb::Snapshot *s = reinterpret_cast<leveldb::Snapshot*>(snapshot);
+  db->ReleaseSnapshot(s);
+}
+
+static std::mutex mutex;
+jbyteArray JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_getTest(JNIEnv *env,
+jclass, jlong handle, jbyteArray jkey) {
+  mutex.lock();
+  JNIByteArrayHolder<GetByteArrayElements> key(env, jkey);
+  std::string result;
+  result.resize(100);
+  mutex.unlock();
+  return ToJByteArray(env, leveldb::Slice(result));
+}
+
 void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_Iterator_destruct(JNIEnv *, jclass, jlong handle) {
   delete reinterpret_cast<leveldb::Iterator*>(handle);
 }
@@ -212,6 +259,14 @@
   options->block_size = value;
 }
 
+void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_Options_blockCacheSize(JNIEnv *, jclass, jlong handle, jlong value) {
+  leveldb::Options *options = reinterpret_cast<leveldb::Options*>(handle);
+  if (options->block_cache) {
+    delete options->block_cache;
+  }
+  options->block_cache = leveldb::NewLRUCache(value);
+}
+
 jlong JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_ReadOptions_construct(JNIEnv *, jclass) {
   return uintptr(new leveldb::ReadOptions());
 }
@@ -220,6 +275,11 @@
   delete reinterpret_cast<leveldb::ReadOptions*>(handle);
 }
 
+void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_ReadOptions_snapshot(JNIEnv *, jclass, jlong handle, jlong snapshot) {
+  leveldb::ReadOptions *o = reinterpret_cast<leveldb::ReadOptions*>(handle);
+  o->snapshot = reinterpret_cast<leveldb::Snapshot*>(snapshot);
+}
+
 jlong JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_WriteOptions_construct(JNIEnv *, jclass) {
   return uintptr(new leveldb::WriteOptions());
 }