WIP Various fixes and performance improvements.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 0e50d8c..b3c6083 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -348,7 +348,17 @@
this.enableLevelDb = conf.getBoolean("dfs.partialns", false);
if (enableLevelDb) {
String dbPath = conf.get("dfs.partialns.path");
- Options options = new Options().createIfMissing(true);
+ int writeBufferSize = conf.getInt("dfs.partialns.writebuffer",
+ 4096 * 1024);
+ long blockCacheSize = conf.getLong(
+ "dfs.partialns.blockcache", 0);
+ Options options = new Options().createIfMissing(true)
+ .writeBufferSize(writeBufferSize);
+
+ if (blockCacheSize != 0) {
+ options.blockCacheSize(blockCacheSize);
+ }
+
this.levelDb = org.apache.hadoop.hdfs.hdfsdb.DB.open(options, dbPath);
try (RWTransaction tx = newRWTransaction().begin()) {
tx.putINode(ROOT_INODE_ID, createRootForFlatNS(ns));
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBROTransaction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBROTransaction.java
index f55ed63..50d8c30 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBROTransaction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBROTransaction.java
@@ -29,37 +29,43 @@
class LevelDBROTransaction extends ROTransaction {
private final org.apache.hadoop.hdfs.hdfsdb.DB hdfsdb;
- private static final ReadOptions OPTIONS = new ReadOptions();
+
+ private Snapshot snapshot;
+ private final ReadOptions options = new ReadOptions();
+ public static final ReadOptions OPTIONS = new ReadOptions();
+
LevelDBROTransaction(FSDirectory fsd, org.apache.hadoop.hdfs.hdfsdb.DB db) {
super(fsd);
this.hdfsdb = db;
}
LevelDBROTransaction begin() {
- fsd.readLock();
+ snapshot = hdfsdb.snapshot();
+ options.snapshot(snapshot);
return this;
}
@Override
FlatINode getINode(long id) {
- return getFlatINode(id, hdfsdb);
+ return getFlatINode(id, hdfsdb, options);
}
@Override
long getChild(long parentId, ByteBuffer localName) {
- return getChild(parentId, localName, hdfsdb);
+ return getChild(parentId, localName, hdfsdb, options);
}
@Override
DBChildrenView childrenView(long parent) {
- return getChildrenView(parent, hdfsdb);
+ return getChildrenView(parent, hdfsdb, options);
}
static FlatINode getFlatINode(
- long id, org.apache.hadoop.hdfs.hdfsdb.DB hdfsdb) {
+ long id, DB hdfsdb, ReadOptions options) {
byte[] key = inodeKey(id);
try {
- byte[] bytes = hdfsdb.get(OPTIONS, key);
+ byte[] bytes = options == OPTIONS ? hdfsdb.get(options, key) : hdfsdb
+ .snapshotGet(options, key);
if (bytes == null) {
return null;
}
@@ -83,11 +89,13 @@
};
}
- static long getChild(long parentId, ByteBuffer localName, DB hdfsdb) {
+ static long getChild(
+ long parentId, ByteBuffer localName, DB hdfsdb, ReadOptions options) {
Preconditions.checkArgument(localName.hasRemaining());
byte[] key = inodeChildKey(parentId, localName);
try {
- byte[] bytes = hdfsdb.get(OPTIONS, key);
+ byte[] bytes = options == OPTIONS ? hdfsdb.get(options, key) : hdfsdb
+ .snapshotGet(options, key);
if (bytes == null) {
return INVALID_INODE_ID;
}
@@ -109,7 +117,8 @@
return key;
}
- static DBChildrenView getChildrenView(long parent, DB hdfsdb) {
+ static DBChildrenView getChildrenView(
+ long parent, DB hdfsdb, ReadOptions options) {
byte[] key = new byte[]{'I',
(byte) ((parent >> 56) & 0xff),
(byte) ((parent >> 48) & 0xff),
@@ -121,9 +130,17 @@
(byte) (parent & 0xff),
1
};
- Iterator it = hdfsdb.iterator(OPTIONS);
+ Iterator it = hdfsdb.iterator(options);
it.seek(key);
return new LevelDBChildrenView(parent, it);
}
+ @Override
+ public void close() throws IOException {
+ try {
+ snapshot.close();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBRWTransaction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBRWTransaction.java
index 3f14cff..4c8a2d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBRWTransaction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBRWTransaction.java
@@ -37,17 +37,20 @@
@Override
FlatINode getINode(long id) {
- return LevelDBROTransaction.getFlatINode(id, hdfsdb);
+ return LevelDBROTransaction.getFlatINode(id, hdfsdb,
+ LevelDBROTransaction.OPTIONS);
}
@Override
long getChild(long parentId, ByteBuffer localName) {
- return LevelDBROTransaction.getChild(parentId, localName, hdfsdb);
+ return LevelDBROTransaction.getChild(parentId, localName, hdfsdb,
+ LevelDBROTransaction.OPTIONS);
}
@Override
DBChildrenView childrenView(long parent) {
- return LevelDBROTransaction.getChildrenView(parent, hdfsdb);
+ return LevelDBROTransaction.getChildrenView(parent, hdfsdb,
+ LevelDBROTransaction.OPTIONS);
}
@Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBReplayTransaction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBReplayTransaction.java
index e1b8eff..d486010 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBReplayTransaction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LevelDBReplayTransaction.java
@@ -38,17 +38,20 @@
@Override
FlatINode getINode(long id) {
- return LevelDBROTransaction.getFlatINode(id, hdfsdb);
+ return LevelDBROTransaction.getFlatINode(id, hdfsdb,
+ LevelDBROTransaction.OPTIONS);
}
@Override
long getChild(long parentId, ByteBuffer localName) {
- return LevelDBROTransaction.getChild(parentId, localName, hdfsdb);
+ return LevelDBROTransaction.getChild(parentId, localName, hdfsdb,
+ LevelDBROTransaction.OPTIONS);
}
@Override
DBChildrenView childrenView(long parent) {
- return LevelDBROTransaction.getChildrenView(parent, hdfsdb);
+ return LevelDBROTransaction.getChildrenView(parent, hdfsdb,
+ LevelDBROTransaction.OPTIONS);
}
@Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/MemDBChildrenView.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/MemDBChildrenView.java
index 3278111..743bf0b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/MemDBChildrenView.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/MemDBChildrenView.java
@@ -31,6 +31,10 @@
@Override
public Iterator<Map.Entry<ByteBuffer, Long>> iterator() {
- return childrenMap.tailMap(start).entrySet().iterator();
+ if (start == null) {
+ return childrenMap.entrySet().iterator();
+ } else {
+ return childrenMap.tailMap(start).entrySet().iterator();
+ }
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/LevelDBProfile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/LevelDBProfile.java
new file mode 100644
index 0000000..51c4dba
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/LevelDBProfile.java
@@ -0,0 +1,83 @@
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.hdfsdb.*;
+import org.apache.log4j.Level;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.util.Time.monotonicNow;
+
+/**
+ * Created by hmai on 6/3/15.
+ */
+public class LevelDBProfile {
+ private static final String DB_PATH = "/Users/hmai/work/test/partialnsdb";
+ private static final int TIMES = 300000;
+ public static void main(String[] args) throws Exception {
+ MiniDFSCluster cluster = null;
+ Configuration conf = new HdfsConfiguration();
+ conf.setBoolean("dfs.partialns", true);
+ conf.set("dfs.partialns.path", DB_PATH);
+ conf.setInt("dfs.partialns.writebuffer", 8388608 * 16);
+ conf.setLong("dfs.partialns.blockcache", 4294967296L);
+ ExecutorService executor = Executors.newFixedThreadPool(8, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "Executor");
+ }
+ });
+ ((Log4JLogger)FSNamesystem.auditLog).getLogger().setLevel(Level.WARN);
+
+ try {
+ FileUtils.deleteDirectory(new File(DB_PATH));
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ final DistributedFileSystem fs = cluster.getFileSystem();
+ final org.apache.hadoop.hdfs.hdfsdb.DB db = cluster.getNamesystem().getFSDirectory().getLevelDb();
+ final Path PATH = new Path("/foo");
+ final byte[] p = new byte[20];
+ try (OutputStream os = fs.create(PATH)) {
+ }
+ cluster.shutdownDataNodes();
+ final FSNamesystem fsn = cluster.getNamesystem();
+ final Runnable getFileStatus = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ fsn.getFileInfo("/foo", true);
+ //fs.getFileStatus(PATH);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ long start = monotonicNow();
+ for (int i = 0; i < TIMES; ++i) {
+ executor.submit(getFileStatus);
+ }
+ executor.shutdown();
+ executor.awaitTermination(1, TimeUnit.HOURS);
+ long end = monotonicNow();
+ System.err.println("Time: " + (end - start) + " ms");
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/DB.java b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/DB.java
index 0355dcc..57ec71e 100644
--- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/DB.java
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/DB.java
@@ -21,7 +21,8 @@
public class DB extends NativeObject {
public static DB open(Options options, String path) throws IOException {
- return new DB(open(options.nativeHandle(), path));
+ long handle = open(options.nativeHandle(), path);
+ return new DB(handle);
}
@Override
@@ -36,6 +37,11 @@
return get(nativeHandle, options.nativeHandle(), key);
}
+ public byte[] snapshotGet(ReadOptions options, byte[] key) throws
+ IOException {
+ return snapshotGet(nativeHandle, options.nativeHandle(), key);
+ }
+
public void write(WriteOptions options, WriteBatch batch) throws IOException {
write(nativeHandle, options.nativeHandle(), batch.nativeHandle());
}
@@ -52,6 +58,14 @@
return new Iterator(newIterator(nativeHandle, options.nativeHandle()));
}
+ public Snapshot snapshot() {
+ return new Snapshot(nativeHandle, newSnapshot(nativeHandle));
+ }
+
+ public byte[] dbGetTest(byte[] key) throws IOException {
+ return getTest(nativeHandle, key);
+ }
+
private DB(long handle) {
super(handle);
}
@@ -60,6 +74,8 @@
private static native void close(long handle);
private static native byte[] get(long handle, long options,
byte[] key) throws IOException;
+ private static native byte[] snapshotGet(long handle, long options,
+ byte[] key) throws IOException;
private static native void write(long handle, long options,
long batch) throws IOException;
private static native void put(long handle, long options,
@@ -67,4 +83,9 @@
private static native void delete(long handle, long options,
byte[] key);
private static native long newIterator(long handle, long readOptions);
+ private static native long newSnapshot(long handle);
+ static native void releaseSnapshot(long handle, long snapshotHandle);
+
+ private static native byte[] getTest(long handle, byte[] key) throws IOException;
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Options.java b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Options.java
index a12da61..626e4d8 100644
--- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Options.java
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Options.java
@@ -56,6 +56,11 @@
return this;
}
+ public Options blockCacheSize(long capacity) {
+ blockCacheSize(nativeHandle, capacity);
+ return this;
+ }
+
@Override
public void close() {
if (nativeHandle != 0) {
@@ -70,4 +75,5 @@
private static native void compressionType(long handle, int value);
private static native void writeBufferSize(long handle, int value);
private static native void blockSize(long handle, int value);
+ private static native void blockCacheSize(long handle, long capacity);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/ReadOptions.java b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/ReadOptions.java
index e97e05f..b2e6726 100644
--- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/ReadOptions.java
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/ReadOptions.java
@@ -22,6 +22,11 @@
super(construct());
}
+ public ReadOptions snapshot(Snapshot snapshot) {
+ snapshot(nativeHandle, snapshot.nativeHandle);
+ return this;
+ }
+
@Override
public void close() {
if (nativeHandle != 0) {
@@ -32,4 +37,5 @@
private static native long construct();
private static native void destruct(long handle);
+ private static native void snapshot(long handle, long snapshot);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Snapshot.java b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Snapshot.java
new file mode 100644
index 0000000..ad370b3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/java/org/apache/hadoop/hdfs/hdfsdb/Snapshot.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.hdfsdb;
+
+public class Snapshot extends NativeObject {
+ private final long dbHandle;
+ Snapshot(long dbHandle, long nativeHandle) {
+ super(nativeHandle);
+ this.dbHandle = dbHandle;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (nativeHandle != 0) {
+ DB.releaseSnapshot(dbHandle, nativeHandle);
+ nativeHandle = 0;
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.cc
index 1225412..2d0bef2 100644
--- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.cc
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.cc
@@ -1121,6 +1121,52 @@
return s;
}
+Status DBImpl::SnapshotGet(const ReadOptions& options,
+ const Slice& key,
+ const std::function<void(const Slice&)> &get_value) {
+ Status s;
+ assert(options.snapshot);
+ SequenceNumber snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
+ LookupKey lkey(key, snapshot);
+
+ //mutex_.Lock();
+ MemTable* mem = mem_;
+ MemTable* imm = imm_;
+ //mutex_.Unlock();
+ mem->Ref();
+ if (imm != NULL) imm->Ref();
+
+ // First look in the memtable, then in the immutable memtable (if any).
+ if (mem->Get(lkey, get_value, &s)) {
+ // Done
+ } else if (imm != NULL && imm->Get(lkey, get_value, &s)) {
+ // Done
+ } else {
+ assert (false);
+ mutex_.Lock();
+ Version* current = versions_->current();
+ current->Ref();
+ // Unlock while reading from files and memtables
+ mutex_.Unlock();
+ Version::GetStats stats;
+ std::string value;
+ s = current->Get(options, lkey, &value, &stats);
+ if (value.size()) {
+ get_value(Slice(value));
+ }
+ mutex_.Lock();
+ if (current->UpdateStats(stats)) {
+ MaybeScheduleCompaction();
+ }
+ current->Unref();
+ mutex_.Unlock();
+ }
+
+ mem->Unref();
+ if (imm != NULL) imm->Unref();
+ return s;
+}
+
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot;
uint32_t seed;
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.h b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.h
index cfc9981..21a6c5f 100644
--- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.h
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_impl.h
@@ -35,6 +35,10 @@
virtual Status Get(const ReadOptions& options,
const Slice& key,
std::string* value);
+ virtual Status SnapshotGet(const ReadOptions& options,
+ const Slice& key,
+ const std::function<void(const Slice&)>
+ &get_value);
virtual Iterator* NewIterator(const ReadOptions&);
virtual const Snapshot* GetSnapshot();
virtual void ReleaseSnapshot(const Snapshot* snapshot);
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_test.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_test.cc
index 280b01c..3462fe7 100644
--- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_test.cc
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/db_test.cc
@@ -1848,6 +1848,14 @@
assert(false); // Not implemented
return Status::NotFound(key);
}
+
+ virtual Status SnapshotGet(const ReadOptions& options,
+ const Slice& key, const
+ std::function<void(const Slice&)> &) {
+ assert(false); // Not implemented
+ return Status::NotFound(key);
+ }
+
virtual Iterator* NewIterator(const ReadOptions& options) {
if (options.snapshot == NULL) {
KVMap* saved = new KVMap;
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.cc
index bfec0a7..9cabcae 100644
--- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.cc
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.cc
@@ -106,6 +106,12 @@
}
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
+ return Get(key, [value](const Slice &v) {
+ value->assign(v.data(), v.size()); }, s);
+}
+
+bool MemTable::Get(const LookupKey& key,
+ const std::function<void(const Slice&)> &get_value, Status* s) {
Slice memkey = key.memtable_key();
Table::Iterator iter(&table_);
iter.Seek(memkey.data());
@@ -130,7 +136,7 @@
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
- value->assign(v.data(), v.size());
+ get_value(v);
return true;
}
case kTypeDeletion:
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.h b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.h
index 92e90bb..31835bf 100644
--- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.h
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/db/memtable.h
@@ -6,6 +6,9 @@
#define STORAGE_LEVELDB_DB_MEMTABLE_H_
#include <string>
+#include <functional>
+#include <atomic>
+
#include "leveldb/db.h"
#include "db/dbformat.h"
#include "db/skiplist.h"
@@ -28,9 +31,9 @@
// Drop reference count. Delete if no more references exist.
void Unref() {
- --refs_;
- assert(refs_ >= 0);
- if (refs_ <= 0) {
+ int v = std::atomic_fetch_sub(&refs_, 1);
+ assert(v >= 0);
+ if (v <= 0) {
delete this;
}
}
@@ -62,6 +65,8 @@
// in *status and return true.
// Else, return false.
bool Get(const LookupKey& key, std::string* value, Status* s);
+ bool Get(const LookupKey& key, const std::function<void(const Slice&)>
+ &get_value, Status* s);
private:
~MemTable(); // Private since only Unref() should be used to delete it
@@ -77,7 +82,7 @@
typedef SkipList<const char*, KeyComparator> Table;
KeyComparator comparator_;
- int refs_;
+ std::atomic_int refs_;
Arena arena_;
Table table_;
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/include/leveldb/db.h b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/include/leveldb/db.h
index 40851b2..a81fe1a 100644
--- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/include/leveldb/db.h
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/hdfsdb/include/leveldb/db.h
@@ -7,6 +7,8 @@
#include <stdint.h>
#include <stdio.h>
+#include <functional>
+
#include "leveldb/iterator.h"
#include "leveldb/options.h"
@@ -83,6 +85,15 @@
virtual Status Get(const ReadOptions& options,
const Slice& key, std::string* value) = 0;
+ // Get the value from a particular snapshot. The call only blocks if
+ // the value resides in the block cache or on the disk.
+ //
+ // May return some other Status on an error.
+ virtual Status SnapshotGet(const ReadOptions& options,
+ const Slice& key,
+ const std::function<void(const Slice&)>
+ &get_value) = 0;
+
// Return a heap-allocated iterator over the contents of the database.
// The result of NewIterator() is initially invalid (caller must
// call one of the Seek methods on the iterator before using it).
diff --git a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/jni/bindings.cc b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/jni/bindings.cc
index 33604b8b..2cae088 100644
--- a/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/jni/bindings.cc
+++ b/hadoop-hdfs-project/hadoop-hdfsdb/src/main/native/jni/bindings.cc
@@ -16,7 +16,7 @@
* limitations under the License.
*/
#include <jni.h>
-
+#include <mutex>
#undef JNIEXPORT
#if _WIN32
#define JNIEXPORT __declspec(dllexport)
@@ -33,11 +33,12 @@
#include "org_apache_hadoop_hdfs_hdfsdb_WriteOptions.h"
#include <leveldb/db.h>
+#include <leveldb/cache.h>
#include <leveldb/options.h>
#include <leveldb/write_batch.h>
#include <leveldb/cache.h>
-static inline uintptr_t uintptr(void *ptr) {
+static inline uintptr_t uintptr(const void *ptr) {
return reinterpret_cast<uintptr_t>(ptr);
}
@@ -130,6 +131,29 @@
return ToJByteArray(env, leveldb::Slice(result));
}
+jbyteArray JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_snapshotGet(JNIEnv *env, jclass, jlong handle, jlong jread_options, jbyteArray jkey) {
+ leveldb::DB *db = reinterpret_cast<leveldb::DB*>(handle);
+ leveldb::ReadOptions *options = reinterpret_cast<leveldb::ReadOptions*>(jread_options);
+ jbyteArray res = NULL;
+ leveldb::Status status;
+ {
+ JNIByteArrayHolder<GetByteArrayCritical> key(env, jkey);
+ status = db->SnapshotGet(*options, key.slice(),
+ [env,&res](const leveldb::Slice &v) {
+ res = ToJByteArray(env, v);
+ });
+ }
+
+ if (status.IsNotFound()) {
+ return NULL;
+ } else if (!status.ok()) {
+ env->ThrowNew(env->FindClass("java/io/IOException"), status.ToString().c_str());
+ return NULL;
+ }
+
+ return res;
+}
+
void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_write(JNIEnv *env, jclass, jlong handle, jlong jwrite_options, jlong jbatch) {
leveldb::DB *db = reinterpret_cast<leveldb::DB*>(handle);
leveldb::WriteOptions *options = reinterpret_cast<leveldb::WriteOptions*>(jwrite_options);
@@ -150,10 +174,33 @@
jlong JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_newIterator(JNIEnv *, jclass, jlong handle, jlong jread_options) {
leveldb::DB *db = reinterpret_cast<leveldb::DB*>(handle);
leveldb::ReadOptions *options = reinterpret_cast<leveldb::ReadOptions*>(jread_options);
- auto res = uintptr(db->NewIterator(*options));
+ uintptr_t res = uintptr(db->NewIterator(*options));
return res;
}
+jlong JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_newSnapshot(JNIEnv *, jclass, jlong handle) {
+ leveldb::DB *db = reinterpret_cast<leveldb::DB*>(handle);
+ uintptr_t res = uintptr(db->GetSnapshot());
+ return res;
+}
+
+void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_releaseSnapshot(JNIEnv *, jclass, jlong handle, jlong snapshot) {
+ leveldb::DB *db = reinterpret_cast<leveldb::DB*>(handle);
+ leveldb::Snapshot *s = reinterpret_cast<leveldb::Snapshot*>(snapshot);
+ db->ReleaseSnapshot(s);
+}
+
+static std::mutex mutex;
+jbyteArray JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_DB_getTest(JNIEnv *env,
+jclass, jlong handle, jbyteArray jkey) {
+ mutex.lock();
+ JNIByteArrayHolder<GetByteArrayElements> key(env, jkey);
+ std::string result;
+ result.resize(100);
+ mutex.unlock();
+ return ToJByteArray(env, leveldb::Slice(result));
+}
+
void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_Iterator_destruct(JNIEnv *, jclass, jlong handle) {
delete reinterpret_cast<leveldb::Iterator*>(handle);
}
@@ -212,6 +259,14 @@
options->block_size = value;
}
+void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_Options_blockCacheSize(JNIEnv *, jclass, jlong handle, jlong value) {
+ leveldb::Options *options = reinterpret_cast<leveldb::Options*>(handle);
+ if (options->block_cache) {
+ delete options->block_cache;
+ }
+ options->block_cache = leveldb::NewLRUCache(value);
+}
+
jlong JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_ReadOptions_construct(JNIEnv *, jclass) {
return uintptr(new leveldb::ReadOptions());
}
@@ -220,6 +275,11 @@
delete reinterpret_cast<leveldb::ReadOptions*>(handle);
}
+void JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_ReadOptions_snapshot(JNIEnv *, jclass, jlong handle, jlong snapshot) {
+ leveldb::ReadOptions *o = reinterpret_cast<leveldb::ReadOptions*>(handle);
+ o->snapshot = reinterpret_cast<leveldb::Snapshot*>(snapshot);
+}
+
jlong JNICALL Java_org_apache_hadoop_hdfs_hdfsdb_WriteOptions_construct(JNIEnv *, jclass) {
return uintptr(new leveldb::WriteOptions());
}