blob: 670cd6a627feba138d39a24794472986882e32f0 [file] [log] [blame]
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.baidu.hugegraph.backend.store.raft;
import static com.baidu.hugegraph.backend.cache.AbstractCache.ACTION_INVALID;
import java.util.List;
import org.slf4j.Logger;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Iterator;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.core.StateMachineAdapter;
import com.alipay.sofa.jraft.entity.LeaderChangeContext;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.error.RaftException;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.baidu.hugegraph.backend.BackendException;
import com.baidu.hugegraph.backend.serializer.BytesBuffer;
import com.baidu.hugegraph.backend.store.BackendAction;
import com.baidu.hugegraph.backend.store.BackendEntry;
import com.baidu.hugegraph.backend.store.BackendMutation;
import com.baidu.hugegraph.backend.store.BackendStore;
import com.baidu.hugegraph.backend.store.raft.RaftBackendStore.IncrCounter;
import com.baidu.hugegraph.backend.store.raft.rpc.RaftRequests.StoreAction;
import com.baidu.hugegraph.backend.store.raft.rpc.RaftRequests.StoreType;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.type.define.GraphMode;
import com.baidu.hugegraph.util.LZ4Util;
import com.baidu.hugegraph.util.Log;
public class StoreStateMachine extends StateMachineAdapter {
private static final Logger LOG = Log.logger(StoreStateMachine.class);
private final RaftSharedContext context;
private final StoreSnapshotFile snapshotFile;
public StoreStateMachine(RaftSharedContext context) {
this.context = context;
this.snapshotFile = new StoreSnapshotFile(context.stores());
}
private BackendStore store(StoreType type) {
return this.context.originStore(type);
}
private RaftNode node() {
return this.context.node();
}
private void updateCacheIfNeeded(BackendMutation mutation,
boolean forwarded) {
// Update cache only when graph run in general mode
if (this.context.graphMode() != GraphMode.NONE) {
return;
}
/*
* 1. Follower need to update cache from store to tx
* 2. If request come from leader, cache will be updated by upper layer
* 3. If request is forwarded by follower, need to update cache
*/
if (!forwarded && this.node().selfIsLeader()) {
return;
}
for (HugeType type : mutation.types()) {
if (!type.isGraph() && !type.isSchema()) {
continue;
}
for (java.util.Iterator<BackendAction> it = mutation.mutation(type);
it.hasNext();) {
BackendEntry entry = it.next().entry();
this.context.notifyCache(ACTION_INVALID, type, entry.originId());
}
}
}
@Override
public void onApply(Iterator iter) {
LOG.debug("Node role: {}", this.node().selfIsLeader() ?
"leader" : "follower");
StoreClosure closure = null;
try {
while (iter.hasNext()) {
closure = (StoreClosure) iter.done();
if (closure != null) {
// Leader just take it out from the closure
StoreCommand command = closure.command();
BytesBuffer buffer = BytesBuffer.wrap(command.data());
// The first two bytes are StoreType and StoreAction
StoreType type = StoreType.valueOf(buffer.read());
StoreAction action = StoreAction.valueOf(buffer.read());
boolean forwarded = command.forwarded();
// Let the producer thread to handle it
closure.complete(Status.OK(), () -> {
this.applyCommand(type, action, buffer, forwarded);
return null;
});
} else {
// Follower need readMutation data
byte[] bytes = iter.getData().array();
// Follower seems no way to wait future
// Let the backend thread do it directly
this.context.backendExecutor().submit(() -> {
BytesBuffer buffer = LZ4Util.decompress(bytes,
RaftSharedContext.BLOCK_SIZE);
buffer.forReadWritten();
StoreType type = StoreType.valueOf(buffer.read());
StoreAction action = StoreAction.valueOf(buffer.read());
try {
this.applyCommand(type, action, buffer, false);
} catch (Throwable e) {
LOG.error("Failed to execute backend command: {}",
action, e);
throw new BackendException("Backend error", e);
}
});
}
iter.next();
}
} catch (Throwable e) {
LOG.error("StateMachine occured critical error", e);
Status status = new Status(RaftError.ESTATEMACHINE,
"StateMachine occured critical error: %s",
e.getMessage());
if (closure != null) {
closure.failure(status, e);
}
// Will cause current node inactive
iter.setErrorAndRollback(1L, status);
}
}
private void applyCommand(StoreType type, StoreAction action,
BytesBuffer buffer, boolean forwarded) {
BackendStore store = type != StoreType.ALL ? this.store(type) : null;
switch (action) {
case CLEAR:
boolean clearSpace = buffer.read() > 0;
store.clear(clearSpace);
this.context.clearCache();
break;
case TRUNCATE:
store.truncate();
this.context.clearCache();
break;
case SNAPSHOT:
assert store == null;
this.node().snapshot();
break;
case BEGIN_TX:
store.beginTx();
break;
case COMMIT_TX:
List<BackendMutation> ms = StoreSerializer.readMutations(buffer);
// RaftBackendStore doesn't write raft log for beginTx
store.beginTx();
for (BackendMutation mutation : ms) {
store.mutate(mutation);
this.updateCacheIfNeeded(mutation, forwarded);
}
store.commitTx();
break;
case ROLLBACK_TX:
store.rollbackTx();
break;
// increase counter
case INCR_COUNTER:
IncrCounter counter = StoreSerializer.readIncrCounter(buffer);
store.increaseCounter(counter.type(), counter.increment());
break;
default:
throw new IllegalArgumentException("Invalid action " + action);
}
}
@Override
public void onSnapshotSave(SnapshotWriter writer, Closure done) {
LOG.debug("The node {} start snapshot save", this.node().nodeId());
this.snapshotFile.save(writer, done, this.context.snapshotExecutor());
}
@Override
public boolean onSnapshotLoad(SnapshotReader reader) {
if (this.node() != null && this.node().selfIsLeader()) {
LOG.warn("Leader is not supposed to load snapshot.");
return false;
}
return this.snapshotFile.load(reader);
}
@Override
public void onLeaderStart(long term) {
LOG.info("The node {} become to leader", this.node().nodeId());
this.node().onLeaderInfoChange(this.node().nodeId(), true);
super.onLeaderStart(term);
}
@Override
public void onLeaderStop(Status status) {
LOG.info("The node {} abdicated from leader", this.node().nodeId());
this.node().onLeaderInfoChange(null, false);
super.onLeaderStop(status);
}
@Override
public void onStartFollowing(LeaderChangeContext ctx) {
LOG.info("The node {} become to follower", this.node().nodeId());
this.node().onLeaderInfoChange(ctx.getLeaderId(), false);
super.onStartFollowing(ctx);
}
@Override
public void onStopFollowing(LeaderChangeContext ctx) {
LOG.info("The node {} abdicated from follower", this.node().nodeId());
this.node().onLeaderInfoChange(null, false);
super.onStopFollowing(ctx);
}
@Override
public void onConfigurationCommitted(Configuration conf) {
super.onConfigurationCommitted(conf);
}
@Override
public void onError(final RaftException e) {
LOG.error("Raft error: {}", e.getMessage(), e);
}
}