blob: 62c5b4d37a8d9ada8c46d26a6ce849f4274896fa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.backend.store.raft;
import java.util.Set;
import java.util.concurrent.Future;
import org.slf4j.Logger;
import com.alipay.remoting.rpc.RpcServer;
import org.apache.hugegraph.HugeGraphParams;
import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.backend.store.BackendStore;
import org.apache.hugegraph.backend.store.BackendStoreProvider;
import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.StoreAction;
import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.StoreType;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.event.EventHub;
import org.apache.hugegraph.event.EventListener;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Events;
import org.apache.hugegraph.util.Log;
import com.google.common.collect.ImmutableSet;
public class RaftBackendStoreProvider implements BackendStoreProvider {
private static final Logger LOG = Log.logger(RaftBackendStoreProvider.class);
private final BackendStoreProvider provider;
private final RaftContext context;
private RaftBackendStore schemaStore;
private RaftBackendStore graphStore;
private RaftBackendStore systemStore;
public RaftBackendStoreProvider(HugeGraphParams params,
BackendStoreProvider provider) {
this.provider = provider;
this.schemaStore = null;
this.graphStore = null;
this.systemStore = null;
this.context = new RaftContext(params);
}
public RaftGroupManager raftNodeManager() {
return this.context().raftNodeManager();
}
private RaftContext context() {
if (this.context == null) {
E.checkState(false, "Please ensure init raft context");
}
return this.context;
}
private Set<RaftBackendStore> stores() {
return ImmutableSet.of(this.schemaStore, this.graphStore,
this.systemStore);
}
private void checkOpened() {
E.checkState(this.graph() != null &&
this.schemaStore != null &&
this.graphStore != null &&
this.systemStore != null,
"The RaftBackendStoreProvider has not been opened");
}
private void checkNonSharedStore(BackendStore store) {
E.checkArgument(!store.features().supportsSharedStorage(),
"Can't enable raft mode with %s backend",
this.type());
}
@Override
public String type() {
return this.provider.type();
}
@Override
public String driverVersion() {
return this.provider.driverVersion();
}
@Override
public String storedVersion() {
return this.provider.storedVersion();
}
@Override
public String graph() {
return this.provider.graph();
}
@Override
public synchronized BackendStore loadSchemaStore(HugeConfig config) {
if (this.schemaStore == null) {
LOG.info("Init raft backend schema store");
BackendStore store = this.provider.loadSchemaStore(config);
this.checkNonSharedStore(store);
this.schemaStore = new RaftBackendStore(store, this.context());
this.context().addStore(StoreType.SCHEMA, this.schemaStore);
}
return this.schemaStore;
}
@Override
public synchronized BackendStore loadGraphStore(HugeConfig config) {
if (this.graphStore == null) {
LOG.info("Init raft backend graph store");
BackendStore store = this.provider.loadGraphStore(config);
this.checkNonSharedStore(store);
this.graphStore = new RaftBackendStore(store, this.context());
this.context().addStore(StoreType.GRAPH, this.graphStore);
}
return this.graphStore;
}
@Override
public synchronized BackendStore loadSystemStore(HugeConfig config) {
if (this.systemStore == null) {
LOG.info("Init raft backend system store");
BackendStore store = this.provider.loadSystemStore(config);
this.checkNonSharedStore(store);
this.systemStore = new RaftBackendStore(store, this.context());
this.context().addStore(StoreType.SYSTEM, this.systemStore);
}
return this.systemStore;
}
@Override
public void open(String name) {
this.provider.open(name);
}
@Override
public void waitReady(RpcServer rpcServer) {
this.context().initRaftNode(rpcServer);
LOG.info("The raft node is initialized");
this.context().waitRaftNodeStarted();
LOG.info("The raft store is started");
}
@Override
public void close() {
this.provider.close();
if (this.context != null) {
this.context.close();
}
}
@Override
public void init() {
this.checkOpened();
for (RaftBackendStore store : this.stores()) {
store.init();
}
this.notifyAndWaitEvent(Events.STORE_INIT);
LOG.debug("Graph '{}' store has been initialized", this.graph());
}
@Override
public void clear() {
this.checkOpened();
for (RaftBackendStore store : this.stores()) {
// Just clear tables of store, not clear space
store.clear(false);
}
for (RaftBackendStore store : this.stores()) {
// Only clear space of store
store.clear(true);
}
this.notifyAndWaitEvent(Events.STORE_CLEAR);
LOG.debug("Graph '{}' store has been cleared", this.graph());
}
@Override
public void truncate() {
this.checkOpened();
for (RaftBackendStore store : this.stores()) {
store.truncate();
}
this.notifyAndWaitEvent(Events.STORE_TRUNCATE);
LOG.debug("Graph '{}' store has been truncated", this.graph());
/*
* Take the initiative to generate a snapshot, it can avoid this
* situation: when the server restart need to read the database
* (such as checkBackendVersionInfo), it happens that raft replays
* the truncate log, at the same time, the store has been cleared
* (truncate) but init-store has not been completed, which will
* cause reading errors.
* When restarting, load the snapshot first and then read backend,
* will not encounter such an intermediate state.
*/
this.createSnapshot();
}
@Override
public boolean initialized() {
return this.provider.initialized() && this.context != null;
}
@Override
public void createSnapshot() {
// TODO: snapshot for StoreType.ALL instead of StoreType.GRAPH
StoreCommand command = new StoreCommand(StoreType.GRAPH,
StoreAction.SNAPSHOT, null);
RaftStoreClosure closure = new RaftStoreClosure(command);
RaftClosure<?> future = this.context().node().submitAndWait(command,
closure);
E.checkState(future != null, "The snapshot future can't be null");
try {
future.waitFinished();
LOG.debug("Graph '{}' has writed snapshot", this.graph());
} catch (Throwable e) {
throw new BackendException("Failed to create snapshot", e);
}
}
@Override
public void onCloneConfig(HugeConfig config, String newGraph) {
this.provider.onCloneConfig(config, newGraph);
}
@Override
public void onDeleteConfig(HugeConfig config) {
this.provider.onDeleteConfig(config);
}
@Override
public void resumeSnapshot() {
// Jraft doesn't expose API to load snapshot
throw new UnsupportedOperationException("resumeSnapshot");
}
@Override
public void listen(EventListener listener) {
this.provider.listen(listener);
}
@Override
public void unlisten(EventListener listener) {
this.provider.unlisten(listener);
}
@Override
public EventHub storeEventHub() {
return this.provider.storeEventHub();
}
protected final void notifyAndWaitEvent(String event) {
Future<?> future = this.storeEventHub().notify(event, this);
try {
future.get();
} catch (Throwable e) {
LOG.warn("Error when waiting for event execution: {}", event, e);
}
}
}