Fix leader doesn't update cache for request forwarded by follower (#1279)

* also add snapshot operation in security check white list
* clear cache when truncate or clear backend

Change-Id: Ibb6b1e0966d1df3a77b96aa8f48c30cd29c1132a
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/AbstractCache.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/AbstractCache.java
index 8243b03..14d251e 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/AbstractCache.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/AbstractCache.java
@@ -34,6 +34,9 @@
     public static final int DEFAULT_SIZE = 1 * MB;
     public static final int MAX_INIT_CAP = 100 * MB;
 
+    public static final String ACTION_INVALID = "invalid";
+    public static final String ACTION_CLEAR = "clear";
+
     protected static final Logger LOG = Log.logger(Cache.class);
 
     private volatile long hits = 0L;
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedGraphTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedGraphTransaction.java
index 056f2c3..1d1a5bc 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedGraphTransaction.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedGraphTransaction.java
@@ -19,6 +19,9 @@
 
 package com.baidu.hugegraph.backend.cache;
 
+import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_CLEAR;
+import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_INVALID;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -138,7 +141,7 @@
                       this.graph(), event);
             event.checkArgs(String.class, HugeType.class, Id.class);
             Object[] args = event.args();
-            if ("invalid".equals(args[0])) {
+            if (ACTION_INVALID.equals(args[0])) {
                 HugeType type = (HugeType) args[1];
                 Id id = (Id) args[2];
                 if (type.isVertex()) {
@@ -153,7 +156,7 @@
                     this.edgesCache.clear();
                 }
                 return true;
-            } else if ("clear".equals(args[0])) {
+            } else if (ACTION_CLEAR.equals(args[0])) {
                 this.verticesCache.clear();
                 this.edgesCache.clear();
                 return true;
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java
index 20032c5..91926b5 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/cache/CachedSchemaTransaction.java
@@ -19,6 +19,9 @@
 
 package com.baidu.hugegraph.backend.cache;
 
+import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_CLEAR;
+import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_INVALID;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
@@ -104,7 +107,7 @@
                       this.graph(), event);
             event.checkArgs(String.class, HugeType.class, Id.class);
             Object[] args = event.args();
-            if ("invalid".equals(args[0])) {
+            if (ACTION_INVALID.equals(args[0])) {
                 HugeType type = (HugeType) args[1];
                 Id id = (Id) args[2];
                 this.arrayCaches.remove(type, id);
@@ -122,7 +125,7 @@
                     this.nameCache.invalidate(prefixedName);
                 }
                 return true;
-            } else if ("clear".equals(args[0])) {
+            } else if (ACTION_CLEAR.equals(args[0])) {
                 this.clearCache();
                 return true;
             }
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStore.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStore.java
index 7085bf4..3515f12 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStore.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftBackendStore.java
@@ -145,6 +145,7 @@
 
     @Override
     public void beginTx() {
+        // Don't write raft log, commitTx(in statemachine) will call beginTx
     }
 
     @Override
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java
index 45d7dc9..d76cb41 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/RaftSharedContext.java
@@ -19,6 +19,8 @@
 
 package com.baidu.hugegraph.backend.store.raft;
 
+import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_CLEAR;
+
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Paths;
@@ -255,7 +257,13 @@
         return nodeOptions;
     }
 
-    public void notifyCache(HugeType type, Id id) {
+    public void clearCache() {
+        // Just choose two representatives used to represent schema and graph
+        this.notifyCache(ACTION_CLEAR, HugeType.VERTEX_LABEL, null);
+        this.notifyCache(ACTION_CLEAR, HugeType.VERTEX, null);
+    }
+
+    public void notifyCache(String action, HugeType type, Id id) {
         EventHub eventHub;
         if (type.isGraph()) {
             eventHub = this.params.graphEventHub();
@@ -266,7 +274,7 @@
         }
         try {
             // How to avoid update cache from server info
-            eventHub.notify(Events.CACHE, "invalid", type, id);
+            eventHub.notify(Events.CACHE, action, type, id);
         } catch (RejectedExecutionException e) {
             LOG.warn("Can't update cache due to EventHub is too busy");
         }
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreCommand.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreCommand.java
index 3a7a899..f239d0f 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreCommand.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreCommand.java
@@ -30,8 +30,14 @@
     private final StoreType type;
     private final StoreAction action;
     private final byte[] data;
+    private final boolean forwarded;
 
     public StoreCommand(StoreType type, StoreAction action, byte[] data) {
+        this(type, action, data, false);
+    }
+
+    public StoreCommand(StoreType type, StoreAction action,
+                        byte[] data, boolean forwarded) {
         this.type = type;
         this.action = action;
         if (data == null) {
@@ -42,6 +48,7 @@
         }
         this.data[0] = (byte) this.type.getNumber();
         this.data[1] = (byte) this.action.getNumber();
+        this.forwarded = forwarded;
     }
 
     public StoreType type() {
@@ -56,6 +63,10 @@
         return this.data;
     }
 
+    public boolean forwarded() {
+        return this.forwarded;
+    }
+
     public static void writeHeader(BytesBuffer buffer) {
         buffer.write((byte) 0);
         buffer.write((byte) 0);
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java
index f51048b..670cd6a 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/StoreStateMachine.java
@@ -19,10 +19,9 @@
 
 package com.baidu.hugegraph.backend.store.raft;
 
-import java.util.EnumMap;
+import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_INVALID;
+
 import java.util.List;
-import java.util.Map;
-import java.util.function.BiConsumer;
 
 import org.slf4j.Logger;
 
@@ -56,13 +55,10 @@
 
     private final RaftSharedContext context;
     private final StoreSnapshotFile snapshotFile;
-    private final Map<StoreAction, BiConsumer<BackendStore, BytesBuffer>> funcs;
 
     public StoreStateMachine(RaftSharedContext context) {
         this.context = context;
         this.snapshotFile = new StoreSnapshotFile(context.stores());
-        this.funcs = new EnumMap<>(StoreAction.class);
-        this.registerCommands();
     }
 
     private BackendStore store(StoreType type) {
@@ -73,45 +69,18 @@
         return this.context.node();
     }
 
-    private void registerCommands() {
-        // clear
-        this.register(StoreAction.CLEAR, (store, buffer) -> {
-            boolean clearSpace = buffer.read() > 0;
-            store.clear(clearSpace);
-        });
-        this.register(StoreAction.TRUNCATE, (store, buffer) -> {
-            store.truncate();
-        });
-        this.register(StoreAction.SNAPSHOT, (store, buffer) -> {
-            assert store == null;
-            this.node().snapshot();
-        });
-        this.register(StoreAction.BEGIN_TX, (store, buffer) -> store.beginTx());
-        this.register(StoreAction.COMMIT_TX, (store, buffer) -> {
-            List<BackendMutation> ms = StoreSerializer.readMutations(buffer);
-            store.beginTx();
-            for (BackendMutation mutation : ms) {
-                store.mutate(mutation);
-                // update cache on follower when graph run in general mode
-                if (this.context.graphMode() == GraphMode.NONE) {
-                    this.updateCacheIfNeeded(mutation);
-                }
-            }
-            store.commitTx();
-        });
-        this.register(StoreAction.ROLLBACK_TX, (store, buffer) -> {
-            store.rollbackTx();
-        });
-        // increase counter
-        this.register(StoreAction.INCR_COUNTER, (store, buffer) -> {
-            IncrCounter counter = StoreSerializer.readIncrCounter(buffer);
-            store.increaseCounter(counter.type(), counter.increment());
-        });
-    }
-
-    private void updateCacheIfNeeded(BackendMutation mutation) {
-        // Only follower need to update cache from store to tx
-        if (this.node().selfIsLeader()) {
+    private void updateCacheIfNeeded(BackendMutation mutation,
+                                     boolean forwarded) {
+        // Update cache only when graph run in general mode
+        if (this.context.graphMode() != GraphMode.NONE) {
+            return;
+        }
+        /*
+         * 1. Follower need to update cache from store to tx
+         * 2. If request come from leader, cache will be updated by upper layer
+         * 3. If request is forwarded by follower, need to update cache
+         */
+        if (!forwarded && this.node().selfIsLeader()) {
             return;
         }
         for (HugeType type : mutation.types()) {
@@ -121,16 +90,11 @@
             for (java.util.Iterator<BackendAction> it = mutation.mutation(type);
                  it.hasNext();) {
                 BackendEntry entry = it.next().entry();
-                this.context.notifyCache(type, entry.originId());
+                this.context.notifyCache(ACTION_INVALID, type, entry.originId());
             }
         }
     }
 
-    private void register(StoreAction action,
-                          BiConsumer<BackendStore, BytesBuffer> func) {
-        this.funcs.put(action, func);
-    }
-
     @Override
     public void onApply(Iterator iter) {
         LOG.debug("Node role: {}", this.node().selfIsLeader() ?
@@ -141,13 +105,16 @@
                 closure = (StoreClosure) iter.done();
                 if (closure != null) {
                     // Leader just take it out from the closure
-                    BytesBuffer buffer = BytesBuffer.wrap(closure.command().data());
+                    StoreCommand command = closure.command();
+                    BytesBuffer buffer = BytesBuffer.wrap(command.data());
                     // The first two bytes are StoreType and StoreAction
                     StoreType type = StoreType.valueOf(buffer.read());
                     StoreAction action = StoreAction.valueOf(buffer.read());
+                    boolean forwarded = command.forwarded();
                     // Let the producer thread to handle it
                     closure.complete(Status.OK(), () -> {
-                        return this.applyCommand(type, action, buffer);
+                        this.applyCommand(type, action, buffer, forwarded);
+                        return null;
                     });
                 } else {
                     // Follower need readMutation data
@@ -161,7 +128,7 @@
                         StoreType type = StoreType.valueOf(buffer.read());
                         StoreAction action = StoreAction.valueOf(buffer.read());
                         try {
-                            this.applyCommand(type, action, buffer);
+                            this.applyCommand(type, action, buffer, false);
                         } catch (Throwable e) {
                             LOG.error("Failed to execute backend command: {}",
                                       action, e);
@@ -184,12 +151,47 @@
         }
     }
 
-    private Object applyCommand(StoreType type, StoreAction action,
-                                BytesBuffer buffer) {
+    private void applyCommand(StoreType type, StoreAction action,
+                              BytesBuffer buffer, boolean forwarded) {
         BackendStore store = type != StoreType.ALL ? this.store(type) : null;
-        BiConsumer<BackendStore, BytesBuffer> func = this.funcs.get(action);
-        func.accept(store, buffer);
-        return null;
+        switch (action) {
+            case CLEAR:
+                boolean clearSpace = buffer.read() > 0;
+                store.clear(clearSpace);
+                this.context.clearCache();
+                break;
+            case TRUNCATE:
+                store.truncate();
+                this.context.clearCache();
+                break;
+            case SNAPSHOT:
+                assert store == null;
+                this.node().snapshot();
+                break;
+            case BEGIN_TX:
+                store.beginTx();
+                break;
+            case COMMIT_TX:
+                List<BackendMutation> ms = StoreSerializer.readMutations(buffer);
+                // RaftBackendStore doesn't write raft log for beginTx
+                store.beginTx();
+                for (BackendMutation mutation : ms) {
+                    store.mutate(mutation);
+                    this.updateCacheIfNeeded(mutation, forwarded);
+                }
+                store.commitTx();
+                break;
+            case ROLLBACK_TX:
+                store.rollbackTx();
+                break;
+            // increase counter
+            case INCR_COUNTER:
+                IncrCounter counter = StoreSerializer.readIncrCounter(buffer);
+                store.increaseCounter(counter.type(), counter.increment());
+                break;
+            default:
+                throw new IllegalArgumentException("Invalid action " + action);
+        }
     }
 
     @Override
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/StoreCommandProcessor.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/StoreCommandProcessor.java
index 167fc68..50964d4 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/StoreCommandProcessor.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/backend/store/raft/rpc/StoreCommandProcessor.java
@@ -76,6 +76,6 @@
         StoreType type = request.getType();
         StoreAction action = request.getAction();
         byte[] data = request.getData().toByteArray();
-        return new StoreCommand(type, action, data);
+        return new StoreCommand(type, action, data, true);
     }
 }
diff --git a/hugegraph-core/src/main/java/com/baidu/hugegraph/security/HugeSecurityManager.java b/hugegraph-core/src/main/java/com/baidu/hugegraph/security/HugeSecurityManager.java
index 55ea2e4..4ccf9e9 100644
--- a/hugegraph-core/src/main/java/com/baidu/hugegraph/security/HugeSecurityManager.java
+++ b/hugegraph-core/src/main/java/com/baidu/hugegraph/security/HugeSecurityManager.java
@@ -102,6 +102,12 @@
             "com.baidu.hugegraph.backend.store.hbase.HbaseSessions$RowIterator"
     );
 
+    private static final Set<String> RAFT_CLASSES = ImmutableSet.of(
+            "com.baidu.hugegraph.backend.store.raft.RaftNode",
+            "com.baidu.hugegraph.backend.store.raft.StoreStateMachine",
+            "com.baidu.hugegraph.backend.store.raft.rpc.RpcForwarder"
+    );
+
     @Override
     public void checkPermission(Permission permission) {
         if (DENIED_PERMISSIONS.contains(permission.getName()) &&
@@ -143,7 +149,7 @@
         if (callFromGremlin() && !callFromCaffeine() &&
             !callFromAsyncTasks() && !callFromEventHubNotify() &&
             !callFromBackendThread() && !callFromBackendHbase() &&
-            !callFromRaftMethods()) {
+            !callFromRaft()) {
             throw newSecurityException(
                   "Not allowed to access thread via Gremlin");
         }
@@ -155,7 +161,7 @@
         if (callFromGremlin() && !callFromCaffeine() &&
             !callFromAsyncTasks() && !callFromEventHubNotify() &&
             !callFromBackendThread() && !callFromBackendHbase() &&
-            !callFromRaftMethods()) {
+            !callFromRaft()) {
             throw newSecurityException(
                   "Not allowed to access thread group via Gremlin");
         }
@@ -182,7 +188,8 @@
 
     @Override
     public void checkRead(FileDescriptor fd) {
-        if (callFromGremlin() && !callFromBackendSocket()) {
+        if (callFromGremlin() && !callFromBackendSocket() &&
+            !callFromRaft()) {
             throw newSecurityException("Not allowed to read fd via Gremlin");
         }
         super.checkRead(fd);
@@ -191,7 +198,8 @@
     @Override
     public void checkRead(String file) {
         if (callFromGremlin() && !callFromCaffeine() &&
-            !readGroovyInCurrentDir(file) && !callFromBackendHbase()) {
+            !readGroovyInCurrentDir(file) && !callFromBackendHbase() &&
+            !callFromRaft()) {
             throw newSecurityException(
                   "Not allowed to read file via Gremlin: %s", file);
         }
@@ -200,7 +208,7 @@
 
     @Override
     public void checkRead(String file, Object context) {
-        if (callFromGremlin()) {
+        if (callFromGremlin() && !callFromRaft()) {
             throw newSecurityException(
                   "Not allowed to read file via Gremlin: %s", file);
         }
@@ -209,7 +217,8 @@
 
     @Override
     public void checkWrite(FileDescriptor fd) {
-        if (callFromGremlin() && !callFromBackendSocket()) {
+        if (callFromGremlin() && !callFromBackendSocket() &&
+            !callFromRaft()) {
             throw newSecurityException("Not allowed to write fd via Gremlin");
         }
         super.checkWrite(fd);
@@ -217,7 +226,7 @@
 
     @Override
     public void checkWrite(String file) {
-        if (callFromGremlin()) {
+        if (callFromGremlin() && !callFromRaft()) {
             throw newSecurityException("Not allowed to write file via Gremlin");
         }
         super.checkWrite(file);
@@ -253,7 +262,7 @@
     @Override
     public void checkConnect(String host, int port) {
         if (callFromGremlin() && !callFromBackendSocket() &&
-            !callFromBackendHbase()) {
+            !callFromBackendHbase() && !callFromRaft()) {
             throw newSecurityException(
                   "Not allowed to connect socket via Gremlin");
         }
@@ -308,7 +317,7 @@
     public void checkPropertyAccess(String key) {
         if (!callFromAcceptClassLoaders() && callFromGremlin() &&
             !WHITE_SYSTEM_PROPERTYS.contains(key) && !callFromBackendHbase() &&
-            !callFromRaftMethods()) {
+            !callFromRaft()) {
             throw newSecurityException(
                   "Not allowed to access system property(%s) via Gremlin", key);
         }
@@ -428,9 +437,8 @@
         return callFromWorkerWithClass(HBASE_CLASSES);
     }
 
-    private static boolean callFromRaftMethods() {
-        return callFromMethod("com.baidu.hugegraph.backend.store.raft.rpc.RpcForwarder",
-                              "forwardToLeader");
+    private static boolean callFromRaft() {
+        return callFromWorkerWithClass(RAFT_CLASSES);
     }
 
     private static boolean callFromWorkerWithClass(Set<String> classes) {