Fix leader doesn't update cache for request forwarded by follower (#1279)
* also add snapshot operation in security check white list
* clear cache when truncate or clear backend
Change-Id: Ibb6b1e0966d1df3a77b96aa8f48c30cd29c1132a
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/AbstractCache.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/AbstractCache.java
index 8243b03..14d251e 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/AbstractCache.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/AbstractCache.java
@@ -34,6 +34,9 @@
public static final int DEFAULT_SIZE = 1 * MB;
public static final int MAX_INIT_CAP = 100 * MB;
+ public static final String ACTION_INVALID = "invalid";
+ public static final String ACTION_CLEAR = "clear";
+
protected static final Logger LOG = Log.logger(Cache.class);
private volatile long hits = 0L;
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedGraphTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedGraphTransaction.java
index 056f2c3..1d1a5bc 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedGraphTransaction.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedGraphTransaction.java
@@ -19,6 +19,9 @@
package com.baidu.hugegraph.backend.cache;
+import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_CLEAR;
+import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_INVALID;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -138,7 +141,7 @@
this.graph(), event);
event.checkArgs(String.class, HugeType.class, Id.class);
Object[] args = event.args();
- if ("invalid".equals(args[0])) {
+ if (ACTION_INVALID.equals(args[0])) {
HugeType type = (HugeType) args[1];
Id id = (Id) args[2];
if (type.isVertex()) {
@@ -153,7 +156,7 @@
this.edgesCache.clear();
}
return true;
- } else if ("clear".equals(args[0])) {
+ } else if (ACTION_CLEAR.equals(args[0])) {
this.verticesCache.clear();
this.edgesCache.clear();
return true;
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java
index 20032c5..91926b5 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java
@@ -19,6 +19,9 @@
package com.baidu.hugegraph.backend.cache;
+import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_CLEAR;
+import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_INVALID;
+
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@@ -104,7 +107,7 @@
this.graph(), event);
event.checkArgs(String.class, HugeType.class, Id.class);
Object[] args = event.args();
- if ("invalid".equals(args[0])) {
+ if (ACTION_INVALID.equals(args[0])) {
HugeType type = (HugeType) args[1];
Id id = (Id) args[2];
this.arrayCaches.remove(type, id);
@@ -122,7 +125,7 @@
this.nameCache.invalidate(prefixedName);
}
return true;
- } else if ("clear".equals(args[0])) {
+ } else if (ACTION_CLEAR.equals(args[0])) {
this.clearCache();
return true;
}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStore.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStore.java
index 7085bf4..3515f12 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStore.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStore.java
@@ -145,6 +145,7 @@
@Override
public void beginTx() {
+ // Don't write raft log, commitTx(in statemachine) will call beginTx
}
@Override
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java
index 45d7dc9..d76cb41 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java
@@ -19,6 +19,8 @@
package com.baidu.hugegraph.backend.store.raft;
+import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_CLEAR;
+
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
@@ -255,7 +257,13 @@
return nodeOptions;
}
- public void notifyCache(HugeType type, Id id) {
+ public void clearCache() {
+ // Just choose two representatives used to represent schema and graph
+ this.notifyCache(ACTION_CLEAR, HugeType.VERTEX_LABEL, null);
+ this.notifyCache(ACTION_CLEAR, HugeType.VERTEX, null);
+ }
+
+ public void notifyCache(String action, HugeType type, Id id) {
EventHub eventHub;
if (type.isGraph()) {
eventHub = this.params.graphEventHub();
@@ -266,7 +274,7 @@
}
try {
// How to avoid update cache from server info
- eventHub.notify(Events.CACHE, "invalid", type, id);
+ eventHub.notify(Events.CACHE, action, type, id);
} catch (RejectedExecutionException e) {
LOG.warn("Can't update cache due to EventHub is too busy");
}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreCommand.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreCommand.java
index 3a7a899..f239d0f 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreCommand.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreCommand.java
@@ -30,8 +30,14 @@
private final StoreType type;
private final StoreAction action;
private final byte[] data;
+ private final boolean forwarded;
public StoreCommand(StoreType type, StoreAction action, byte[] data) {
+ this(type, action, data, false);
+ }
+
+ public StoreCommand(StoreType type, StoreAction action,
+ byte[] data, boolean forwarded) {
this.type = type;
this.action = action;
if (data == null) {
@@ -42,6 +48,7 @@
}
this.data[0] = (byte) this.type.getNumber();
this.data[1] = (byte) this.action.getNumber();
+ this.forwarded = forwarded;
}
public StoreType type() {
@@ -56,6 +63,10 @@
return this.data;
}
+ public boolean forwarded() {
+ return this.forwarded;
+ }
+
public static void writeHeader(BytesBuffer buffer) {
buffer.write((byte) 0);
buffer.write((byte) 0);
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java
index f51048b..670cd6a 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java
@@ -19,10 +19,9 @@
package com.baidu.hugegraph.backend.store.raft;
-import java.util.EnumMap;
+import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_INVALID;
+
import java.util.List;
-import java.util.Map;
-import java.util.function.BiConsumer;
import org.slf4j.Logger;
@@ -56,13 +55,10 @@
private final RaftSharedContext context;
private final StoreSnapshotFile snapshotFile;
- private final Map<StoreAction, BiConsumer<BackendStore, BytesBuffer>> funcs;
public StoreStateMachine(RaftSharedContext context) {
this.context = context;
this.snapshotFile = new StoreSnapshotFile(context.stores());
- this.funcs = new EnumMap<>(StoreAction.class);
- this.registerCommands();
}
private BackendStore store(StoreType type) {
@@ -73,45 +69,18 @@
return this.context.node();
}
- private void registerCommands() {
- // clear
- this.register(StoreAction.CLEAR, (store, buffer) -> {
- boolean clearSpace = buffer.read() > 0;
- store.clear(clearSpace);
- });
- this.register(StoreAction.TRUNCATE, (store, buffer) -> {
- store.truncate();
- });
- this.register(StoreAction.SNAPSHOT, (store, buffer) -> {
- assert store == null;
- this.node().snapshot();
- });
- this.register(StoreAction.BEGIN_TX, (store, buffer) -> store.beginTx());
- this.register(StoreAction.COMMIT_TX, (store, buffer) -> {
- List<BackendMutation> ms = StoreSerializer.readMutations(buffer);
- store.beginTx();
- for (BackendMutation mutation : ms) {
- store.mutate(mutation);
- // update cache on follower when graph run in general mode
- if (this.context.graphMode() == GraphMode.NONE) {
- this.updateCacheIfNeeded(mutation);
- }
- }
- store.commitTx();
- });
- this.register(StoreAction.ROLLBACK_TX, (store, buffer) -> {
- store.rollbackTx();
- });
- // increase counter
- this.register(StoreAction.INCR_COUNTER, (store, buffer) -> {
- IncrCounter counter = StoreSerializer.readIncrCounter(buffer);
- store.increaseCounter(counter.type(), counter.increment());
- });
- }
-
- private void updateCacheIfNeeded(BackendMutation mutation) {
- // Only follower need to update cache from store to tx
- if (this.node().selfIsLeader()) {
+ private void updateCacheIfNeeded(BackendMutation mutation,
+ boolean forwarded) {
+ // Update cache only when graph run in general mode
+ if (this.context.graphMode() != GraphMode.NONE) {
+ return;
+ }
+ /*
+ * 1. Follower need to update cache from store to tx
+ * 2. If request come from leader, cache will be updated by upper layer
+ * 3. If request is forwarded by follower, need to update cache
+ */
+ if (!forwarded && this.node().selfIsLeader()) {
return;
}
for (HugeType type : mutation.types()) {
@@ -121,16 +90,11 @@
for (java.util.Iterator<BackendAction> it = mutation.mutation(type);
it.hasNext();) {
BackendEntry entry = it.next().entry();
- this.context.notifyCache(type, entry.originId());
+ this.context.notifyCache(ACTION_INVALID, type, entry.originId());
}
}
}
- private void register(StoreAction action,
- BiConsumer<BackendStore, BytesBuffer> func) {
- this.funcs.put(action, func);
- }
-
@Override
public void onApply(Iterator iter) {
LOG.debug("Node role: {}", this.node().selfIsLeader() ?
@@ -141,13 +105,16 @@
closure = (StoreClosure) iter.done();
if (closure != null) {
// Leader just take it out from the closure
- BytesBuffer buffer = BytesBuffer.wrap(closure.command().data());
+ StoreCommand command = closure.command();
+ BytesBuffer buffer = BytesBuffer.wrap(command.data());
// The first two bytes are StoreType and StoreAction
StoreType type = StoreType.valueOf(buffer.read());
StoreAction action = StoreAction.valueOf(buffer.read());
+ boolean forwarded = command.forwarded();
// Let the producer thread to handle it
closure.complete(Status.OK(), () -> {
- return this.applyCommand(type, action, buffer);
+ this.applyCommand(type, action, buffer, forwarded);
+ return null;
});
} else {
// Follower need readMutation data
@@ -161,7 +128,7 @@
StoreType type = StoreType.valueOf(buffer.read());
StoreAction action = StoreAction.valueOf(buffer.read());
try {
- this.applyCommand(type, action, buffer);
+ this.applyCommand(type, action, buffer, false);
} catch (Throwable e) {
LOG.error("Failed to execute backend command: {}",
action, e);
@@ -184,12 +151,47 @@
}
}
- private Object applyCommand(StoreType type, StoreAction action,
- BytesBuffer buffer) {
+ private void applyCommand(StoreType type, StoreAction action,
+ BytesBuffer buffer, boolean forwarded) {
BackendStore store = type != StoreType.ALL ? this.store(type) : null;
- BiConsumer<BackendStore, BytesBuffer> func = this.funcs.get(action);
- func.accept(store, buffer);
- return null;
+ switch (action) {
+ case CLEAR:
+ boolean clearSpace = buffer.read() > 0;
+ store.clear(clearSpace);
+ this.context.clearCache();
+ break;
+ case TRUNCATE:
+ store.truncate();
+ this.context.clearCache();
+ break;
+ case SNAPSHOT:
+ assert store == null;
+ this.node().snapshot();
+ break;
+ case BEGIN_TX:
+ store.beginTx();
+ break;
+ case COMMIT_TX:
+ List<BackendMutation> ms = StoreSerializer.readMutations(buffer);
+ // RaftBackendStore doesn't write raft log for beginTx
+ store.beginTx();
+ for (BackendMutation mutation : ms) {
+ store.mutate(mutation);
+ this.updateCacheIfNeeded(mutation, forwarded);
+ }
+ store.commitTx();
+ break;
+ case ROLLBACK_TX:
+ store.rollbackTx();
+ break;
+ // increase counter
+ case INCR_COUNTER:
+ IncrCounter counter = StoreSerializer.readIncrCounter(buffer);
+ store.increaseCounter(counter.type(), counter.increment());
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid action " + action);
+ }
}
@Override
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/StoreCommandProcessor.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/StoreCommandProcessor.java
index 167fc68..50964d4 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/StoreCommandProcessor.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/StoreCommandProcessor.java
@@ -76,6 +76,6 @@
StoreType type = request.getType();
StoreAction action = request.getAction();
byte[] data = request.getData().toByteArray();
- return new StoreCommand(type, action, data);
+ return new StoreCommand(type, action, data, true);
}
}
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/security/HugeSecurityManager.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/security/HugeSecurityManager.java
index 55ea2e4..4ccf9e9 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/security/HugeSecurityManager.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/security/HugeSecurityManager.java
@@ -102,6 +102,12 @@
"com.baidu.hugegraph.backend.store.hbase.HbaseSessions$RowIterator"
);
+ private static final Set<String> RAFT_CLASSES = ImmutableSet.of(
+ "com.baidu.hugegraph.backend.store.raft.RaftNode",
+ "com.baidu.hugegraph.backend.store.raft.StoreStateMachine",
+ "com.baidu.hugegraph.backend.store.raft.rpc.RpcForwarder"
+ );
+
@Override
public void checkPermission(Permission permission) {
if (DENIED_PERMISSIONS.contains(permission.getName()) &&
@@ -143,7 +149,7 @@
if (callFromGremlin() && !callFromCaffeine() &&
!callFromAsyncTasks() && !callFromEventHubNotify() &&
!callFromBackendThread() && !callFromBackendHbase() &&
- !callFromRaftMethods()) {
+ !callFromRaft()) {
throw newSecurityException(
"Not allowed to access thread via Gremlin");
}
@@ -155,7 +161,7 @@
if (callFromGremlin() && !callFromCaffeine() &&
!callFromAsyncTasks() && !callFromEventHubNotify() &&
!callFromBackendThread() && !callFromBackendHbase() &&
- !callFromRaftMethods()) {
+ !callFromRaft()) {
throw newSecurityException(
"Not allowed to access thread group via Gremlin");
}
@@ -182,7 +188,8 @@
@Override
public void checkRead(FileDescriptor fd) {
- if (callFromGremlin() && !callFromBackendSocket()) {
+ if (callFromGremlin() && !callFromBackendSocket() &&
+ !callFromRaft()) {
throw newSecurityException("Not allowed to read fd via Gremlin");
}
super.checkRead(fd);
@@ -191,7 +198,8 @@
@Override
public void checkRead(String file) {
if (callFromGremlin() && !callFromCaffeine() &&
- !readGroovyInCurrentDir(file) && !callFromBackendHbase()) {
+ !readGroovyInCurrentDir(file) && !callFromBackendHbase() &&
+ !callFromRaft()) {
throw newSecurityException(
"Not allowed to read file via Gremlin: %s", file);
}
@@ -200,7 +208,7 @@
@Override
public void checkRead(String file, Object context) {
- if (callFromGremlin()) {
+ if (callFromGremlin() && !callFromRaft()) {
throw newSecurityException(
"Not allowed to read file via Gremlin: %s", file);
}
@@ -209,7 +217,8 @@
@Override
public void checkWrite(FileDescriptor fd) {
- if (callFromGremlin() && !callFromBackendSocket()) {
+ if (callFromGremlin() && !callFromBackendSocket() &&
+ !callFromRaft()) {
throw newSecurityException("Not allowed to write fd via Gremlin");
}
super.checkWrite(fd);
@@ -217,7 +226,7 @@
@Override
public void checkWrite(String file) {
- if (callFromGremlin()) {
+ if (callFromGremlin() && !callFromRaft()) {
throw newSecurityException("Not allowed to write file via Gremlin");
}
super.checkWrite(file);
@@ -253,7 +262,7 @@
@Override
public void checkConnect(String host, int port) {
if (callFromGremlin() && !callFromBackendSocket() &&
- !callFromBackendHbase()) {
+ !callFromBackendHbase() && !callFromRaft()) {
throw newSecurityException(
"Not allowed to connect socket via Gremlin");
}
@@ -308,7 +317,7 @@
public void checkPropertyAccess(String key) {
if (!callFromAcceptClassLoaders() && callFromGremlin() &&
!WHITE_SYSTEM_PROPERTYS.contains(key) && !callFromBackendHbase() &&
- !callFromRaftMethods()) {
+ !callFromRaft()) {
throw newSecurityException(
"Not allowed to access system property(%s) via Gremlin", key);
}
@@ -428,9 +437,8 @@
return callFromWorkerWithClass(HBASE_CLASSES);
}
- private static boolean callFromRaftMethods() {
- return callFromMethod("com.baidu.hugegraph.backend.store.raft.rpc.RpcForwarder",
- "forwardToLeader");
+ private static boolean callFromRaft() {
+ return callFromWorkerWithClass(RAFT_CLASSES);
}
private static boolean callFromWorkerWithClass(Set<String> classes) {