blob: 6f74bed6a08a278e7495914a0692a5e16b393e4d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.backend.store.raft;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.io.FileUtils;
import org.apache.hugegraph.HugeException;
import org.apache.hugegraph.HugeGraphParams;
import org.apache.hugegraph.backend.cache.Cache;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.store.BackendAction;
import org.apache.hugegraph.backend.store.BackendMutation;
import org.apache.hugegraph.backend.store.BackendStore;
import org.apache.hugegraph.backend.store.BackendStoreProvider;
import org.apache.hugegraph.backend.store.raft.compress.CompressStrategyManager;
import org.apache.hugegraph.backend.store.raft.rpc.AddPeerProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.ListPeersProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.StoreType;
import org.apache.hugegraph.backend.store.raft.rpc.RemovePeerProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.RpcForwarder;
import org.apache.hugegraph.backend.store.raft.rpc.SetLeaderProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.StoreCommandProcessor;
import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.event.EventHub;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.GraphMode;
import org.apache.hugegraph.util.Bytes;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Events;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;
import com.alipay.sofa.jraft.NodeManager;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.option.ReadOnlyOption;
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
public final class RaftContext {
private static final Logger LOG = Log.logger(RaftContext.class);
// unit is ms
public static final int NO_TIMEOUT = -1;
public static final int POLL_INTERVAL = 5000;
public static final int WAIT_RAFTLOG_TIMEOUT = 30 * 60 * 1000;
public static final int WAIT_LEADER_TIMEOUT = 10 * 60 * 1000;
public static final int BUSY_MIN_SLEEP_FACTOR = 3 * 1000;
public static final int BUSY_MAX_SLEEP_FACTOR = 5 * 1000;
public static final int WAIT_RPC_TIMEOUT = 30 * 60 * 1000;
public static final int LOG_WARN_INTERVAL = 60 * 1000;
// compress block size
public static final int BLOCK_SIZE = (int) (Bytes.KB * 8);
// work queue size
public static final int QUEUE_SIZE = CoreOptions.CPUS;
public static final long KEEP_ALIVE_SECOND = 300L;
private final HugeGraphParams params;
private final Configuration groupPeers;
private final RaftBackendStore[] stores;
private final ExecutorService readIndexExecutor;
private final ExecutorService snapshotExecutor;
private final ExecutorService backendExecutor;
private RpcServer raftRpcServer;
private PeerId endpoint;
private RaftNode raftNode;
private RaftGroupManager raftGroupManager;
private RpcForwarder rpcForwarder;
public RaftContext(HugeGraphParams params) {
this.params = params;
HugeConfig config = params.configuration();
/*
* NOTE: `raft.group_peers` option is transferred from ServerConfig
* to CoreConfig, since it's shared by all graphs.
*/
String groupPeersString = this.config().getString("raft.group_peers");
E.checkArgument(groupPeersString != null,
"Please ensure config `raft.group_peers` in raft mode");
this.groupPeers = new Configuration();
if (!this.groupPeers.parse(groupPeersString)) {
throw new HugeException("Failed to parse raft.group_peers: '%s'",
groupPeersString);
}
this.stores = new RaftBackendStore[StoreType.ALL.getNumber()];
if (config.get(CoreOptions.RAFT_SAFE_READ)) {
int threads = config.get(CoreOptions.RAFT_READ_INDEX_THREADS);
this.readIndexExecutor = this.createReadIndexExecutor(threads);
} else {
this.readIndexExecutor = null;
}
int threads = config.get(CoreOptions.RAFT_SNAPSHOT_THREADS);
this.snapshotExecutor = this.createSnapshotExecutor(threads);
threads = config.get(CoreOptions.RAFT_BACKEND_THREADS);
this.backendExecutor = this.createBackendExecutor(threads);
CompressStrategyManager.init(config);
this.raftRpcServer = null;
this.endpoint = null;
this.raftNode = null;
this.raftGroupManager = null;
this.rpcForwarder = null;
}
public void initRaftNode(com.alipay.remoting.rpc.RpcServer rpcServer) {
this.raftRpcServer = this.wrapRpcServer(rpcServer);
this.endpoint = new PeerId(rpcServer.ip(), rpcServer.port());
this.registerRpcRequestProcessors();
LOG.info("Start raft server successfully: {}", this.endpoint());
this.raftNode = new RaftNode(this);
this.rpcForwarder = new RpcForwarder(this.raftNode.node());
this.raftGroupManager = new RaftGroupManagerImpl(this);
}
public void waitRaftNodeStarted() {
RaftNode node = this.node();
node.waitLeaderElected(RaftContext.WAIT_LEADER_TIMEOUT);
node.waitRaftLogSynced(RaftContext.NO_TIMEOUT);
}
public void close() {
LOG.info("Stop raft server: {}", this.endpoint());
RaftNode node = this.node();
if (node != null) {
node.shutdown();
}
this.shutdownRpcServer();
}
public RaftNode node() {
return this.raftNode;
}
protected RpcServer rpcServer() {
return this.raftRpcServer;
}
protected RpcForwarder rpcForwarder() {
return this.rpcForwarder;
}
public RaftGroupManager raftNodeManager() {
return this.raftGroupManager;
}
public String group() {
// Use graph name as group name
return this.params.name();
}
public void addStore(StoreType type, RaftBackendStore store) {
this.stores[type.getNumber()] = store;
}
public StoreType storeType(String store) {
if (BackendStoreProvider.SCHEMA_STORE.equals(store)) {
return StoreType.SCHEMA;
} else if (BackendStoreProvider.GRAPH_STORE.equals(store)) {
return StoreType.GRAPH;
} else {
assert BackendStoreProvider.SYSTEM_STORE.equals(store);
return StoreType.SYSTEM;
}
}
protected RaftBackendStore[] stores() {
return this.stores;
}
public BackendStore originStore(StoreType storeType) {
RaftBackendStore raftStore = this.stores[storeType.getNumber()];
E.checkState(raftStore != null,
"The raft store of type %s shouldn't be null", storeType);
return raftStore.originStore();
}
public NodeOptions nodeOptions() throws IOException {
HugeConfig config = this.config();
NodeOptions nodeOptions = new NodeOptions();
nodeOptions.setEnableMetrics(false);
nodeOptions.setRpcProcessorThreadPoolSize(
config.get(CoreOptions.RAFT_RPC_THREADS));
nodeOptions.setRpcConnectTimeoutMs(
config.get(CoreOptions.RAFT_RPC_CONNECT_TIMEOUT));
nodeOptions.setRpcDefaultTimeout(
1000 * config.get(CoreOptions.RAFT_RPC_TIMEOUT));
nodeOptions.setRpcInstallSnapshotTimeout(
1000 * config.get(CoreOptions.RAFT_INSTALL_SNAPSHOT_TIMEOUT));
int electionTimeout = config.get(CoreOptions.RAFT_ELECTION_TIMEOUT);
nodeOptions.setElectionTimeoutMs(electionTimeout);
nodeOptions.setDisableCli(false);
int snapshotInterval = config.get(CoreOptions.RAFT_SNAPSHOT_INTERVAL);
nodeOptions.setSnapshotIntervalSecs(snapshotInterval);
nodeOptions.setInitialConf(this.groupPeers);
String raftPath = config.get(CoreOptions.RAFT_PATH);
String logUri = Paths.get(raftPath, "log").toString();
FileUtils.forceMkdir(new File(logUri));
nodeOptions.setLogUri(logUri);
String metaUri = Paths.get(raftPath, "meta").toString();
FileUtils.forceMkdir(new File(metaUri));
nodeOptions.setRaftMetaUri(metaUri);
String snapshotUri = Paths.get(raftPath, "snapshot").toString();
FileUtils.forceMkdir(new File(snapshotUri));
nodeOptions.setSnapshotUri(snapshotUri);
RaftOptions raftOptions = nodeOptions.getRaftOptions();
/*
* NOTE: if buffer size is too small(<=1024), will throw exception
* "LogManager is busy, disk queue overload"
*/
raftOptions.setApplyBatch(config.get(CoreOptions.RAFT_APPLY_BATCH));
raftOptions.setDisruptorBufferSize(
config.get(CoreOptions.RAFT_QUEUE_SIZE));
raftOptions.setDisruptorPublishEventWaitTimeoutSecs(
config.get(CoreOptions.RAFT_QUEUE_PUBLISH_TIMEOUT));
raftOptions.setReplicatorPipeline(
config.get(CoreOptions.RAFT_REPLICATOR_PIPELINE));
raftOptions.setOpenStatistics(false);
raftOptions.setReadOnlyOptions(
ReadOnlyOption.valueOf(
config.get(CoreOptions.RAFT_READ_STRATEGY)));
return nodeOptions;
}
protected void clearCache() {
// Just choose two representatives used to represent schema and graph
this.notifyCache(Cache.ACTION_CLEAR, HugeType.VERTEX_LABEL, null);
this.notifyCache(Cache.ACTION_CLEAR, HugeType.VERTEX, null);
}
protected void updateCacheIfNeeded(BackendMutation mutation,
boolean forwarded) {
// Update cache only when graph run in general mode
if (this.graphMode() != GraphMode.NONE) {
return;
}
/*
* 1. If Follower, need to update cache from store to tx
* 3. If Leader, request is forwarded by follower, need to update cache
* 2. If Leader, request comes from leader, don't need to update cache,
* because the cache will be updated by upper layer
*/
if (!forwarded && this.node().selfIsLeader()) {
return;
}
for (HugeType type : mutation.types()) {
List<Id> ids = new ArrayList<>((int) Query.COMMIT_BATCH);
if (type.isSchema() || type.isGraph()) {
java.util.Iterator<BackendAction> it = mutation.mutation(type);
while (it.hasNext()) {
ids.add(it.next().entry().originId());
}
this.notifyCache(Cache.ACTION_INVALID, type, ids);
} else {
// Ignore other types due to not cached them
}
}
}
protected void notifyCache(String action, HugeType type, List<Id> ids) {
EventHub eventHub;
if (type.isGraph()) {
eventHub = this.params.graphEventHub();
} else if (type.isSchema()) {
eventHub = this.params.schemaEventHub();
} else {
return;
}
try {
// How to avoid update cache from server info
if (ids == null) {
eventHub.call(Events.CACHE, action, type);
} else {
if (ids.size() == 1) {
eventHub.call(Events.CACHE, action, type, ids.get(0));
} else {
eventHub.call(Events.CACHE, action, type, ids.toArray());
}
}
} catch (RejectedExecutionException e) {
LOG.warn("Can't update cache due to EventHub is too busy");
}
}
public PeerId endpoint() {
return this.endpoint;
}
public boolean safeRead() {
return this.config().get(CoreOptions.RAFT_SAFE_READ);
}
public ExecutorService snapshotExecutor() {
return this.snapshotExecutor;
}
public ExecutorService backendExecutor() {
return this.backendExecutor;
}
public ExecutorService readIndexExecutor() {
return this.readIndexExecutor;
}
public GraphMode graphMode() {
return this.params.mode();
}
private HugeConfig config() {
return this.params.configuration();
}
@SuppressWarnings("unused")
private RpcServer initAndStartRpcServer() {
Integer lowWaterMark = this.config().get(
CoreOptions.RAFT_RPC_BUF_LOW_WATER_MARK);
System.setProperty("bolt.channel_write_buf_low_water_mark",
String.valueOf(lowWaterMark));
Integer highWaterMark = this.config().get(
CoreOptions.RAFT_RPC_BUF_HIGH_WATER_MARK);
System.setProperty("bolt.channel_write_buf_high_water_mark",
String.valueOf(highWaterMark));
PeerId endpoint = this.endpoint();
NodeManager.getInstance().addAddress(endpoint.getEndpoint());
RpcServer rpcServer = RaftRpcServerFactory.createAndStartRaftRpcServer(
endpoint.getEndpoint());
LOG.info("Raft-RPC server is started successfully");
return rpcServer;
}
private RpcServer wrapRpcServer(com.alipay.remoting.rpc.RpcServer rpcServer) {
// TODO: pass ServerOptions instead of CoreOptions, to share by graphs
Integer lowWaterMark = this.config().get(
CoreOptions.RAFT_RPC_BUF_LOW_WATER_MARK);
System.setProperty("bolt.channel_write_buf_low_water_mark",
String.valueOf(lowWaterMark));
Integer highWaterMark = this.config().get(
CoreOptions.RAFT_RPC_BUF_HIGH_WATER_MARK);
System.setProperty("bolt.channel_write_buf_high_water_mark",
String.valueOf(highWaterMark));
// Reference from RaftRpcServerFactory.createAndStartRaftRpcServer
RpcServer raftRpcServer = new BoltRpcServer(rpcServer);
RaftRpcServerFactory.addRaftRequestProcessors(raftRpcServer);
return raftRpcServer;
}
private void shutdownRpcServer() {
this.raftRpcServer.shutdown();
PeerId endpoint = this.endpoint();
NodeManager.getInstance().removeAddress(endpoint.getEndpoint());
}
private void registerRpcRequestProcessors() {
this.raftRpcServer.registerProcessor(new AddPeerProcessor(this));
this.raftRpcServer.registerProcessor(new RemovePeerProcessor(this));
this.raftRpcServer.registerProcessor(new StoreCommandProcessor(this));
this.raftRpcServer.registerProcessor(new SetLeaderProcessor(this));
this.raftRpcServer.registerProcessor(new ListPeersProcessor(this));
}
private ExecutorService createReadIndexExecutor(int coreThreads) {
int maxThreads = coreThreads << 2;
String name = "store-read-index-callback";
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
return newPool(coreThreads, maxThreads, name, handler);
}
private ExecutorService createSnapshotExecutor(int coreThreads) {
int maxThreads = coreThreads << 2;
String name = "store-snapshot-executor";
RejectedExecutionHandler handler;
handler = new ThreadPoolExecutor.CallerRunsPolicy();
return newPool(coreThreads, maxThreads, name, handler);
}
private ExecutorService createBackendExecutor(int threads) {
String name = "store-backend-executor";
RejectedExecutionHandler handler =
new ThreadPoolExecutor.CallerRunsPolicy();
return newPool(threads, threads, name, handler);
}
private static ExecutorService newPool(int coreThreads, int maxThreads,
String name,
RejectedExecutionHandler handler) {
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(QUEUE_SIZE);
return ThreadPoolUtil.newBuilder()
.poolName(name)
.enableMetric(false)
.coreThreads(coreThreads)
.maximumThreads(maxThreads)
.keepAliveSeconds(KEEP_ALIVE_SECOND)
.workQueue(queue)
.threadFactory(new NamedThreadFactory(name, true))
.rejectedHandler(handler)
.build();
}
}