blob: db37d0a4bd3773a02ef61a39c95f06f50094e0b7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hugegraph.analyzer.Analyzer;
import org.apache.hugegraph.analyzer.AnalyzerFactory;
import org.apache.hugegraph.auth.AuthManager;
import org.apache.hugegraph.auth.StandardAuthManager;
import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.backend.LocalCounter;
import org.apache.hugegraph.backend.cache.Cache;
import org.apache.hugegraph.backend.cache.CacheNotifier;
import org.apache.hugegraph.backend.cache.CacheNotifier.GraphCacheNotifier;
import org.apache.hugegraph.backend.cache.CacheNotifier.SchemaCacheNotifier;
import org.apache.hugegraph.backend.cache.CachedGraphTransaction;
import org.apache.hugegraph.backend.cache.CachedSchemaTransaction;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.id.IdGenerator;
import org.apache.hugegraph.backend.id.SnowflakeIdGenerator;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.serializer.AbstractSerializer;
import org.apache.hugegraph.backend.serializer.SerializerFactory;
import org.apache.hugegraph.backend.store.BackendFeatures;
import org.apache.hugegraph.backend.store.BackendProviderFactory;
import org.apache.hugegraph.backend.store.BackendStore;
import org.apache.hugegraph.backend.store.BackendStoreInfo;
import org.apache.hugegraph.backend.store.BackendStoreProvider;
import org.apache.hugegraph.backend.store.raft.RaftBackendStoreProvider;
import org.apache.hugegraph.backend.store.raft.RaftGroupManager;
import org.apache.hugegraph.backend.store.ram.RamTable;
import org.apache.hugegraph.task.EphemeralJobQueue;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.backend.tx.SchemaTransaction;
import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.config.TypedOption;
import org.apache.hugegraph.event.EventHub;
import org.apache.hugegraph.event.EventListener;
import org.apache.hugegraph.exception.NotAllowException;
import org.apache.hugegraph.io.HugeGraphIoRegistry;
import org.apache.hugegraph.job.EphemeralJob;
import org.apache.hugegraph.masterelection.ClusterRoleStore;
import org.apache.hugegraph.masterelection.Config;
import org.apache.hugegraph.masterelection.RoleElectionConfig;
import org.apache.hugegraph.masterelection.RoleElectionOptions;
import org.apache.hugegraph.masterelection.RoleElectionStateMachine;
import org.apache.hugegraph.masterelection.StandardClusterRoleStore;
import org.apache.hugegraph.masterelection.StandardRoleElectionStateMachine;
import org.apache.hugegraph.perf.PerfUtil.Watched;
import org.apache.hugegraph.rpc.RpcServiceConfig4Client;
import org.apache.hugegraph.rpc.RpcServiceConfig4Server;
import org.apache.hugegraph.schema.EdgeLabel;
import org.apache.hugegraph.schema.IndexLabel;
import org.apache.hugegraph.schema.PropertyKey;
import org.apache.hugegraph.schema.SchemaElement;
import org.apache.hugegraph.schema.SchemaLabel;
import org.apache.hugegraph.schema.SchemaManager;
import org.apache.hugegraph.schema.VertexLabel;
import org.apache.hugegraph.structure.HugeEdge;
import org.apache.hugegraph.structure.HugeEdgeProperty;
import org.apache.hugegraph.structure.HugeFeatures;
import org.apache.hugegraph.structure.HugeVertex;
import org.apache.hugegraph.structure.HugeVertexProperty;
import org.apache.hugegraph.task.ServerInfoManager;
import org.apache.hugegraph.task.TaskManager;
import org.apache.hugegraph.task.TaskScheduler;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.GraphMode;
import org.apache.hugegraph.type.define.GraphReadMode;
import org.apache.hugegraph.type.define.NodeRole;
import org.apache.hugegraph.util.ConfigUtil;
import org.apache.hugegraph.util.DateUtil;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Events;
import org.apache.hugegraph.util.LockUtil;
import org.apache.hugegraph.util.Log;
import org.apache.hugegraph.variables.HugeVariables;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Property;
import org.apache.tinkerpop.gremlin.structure.Transaction;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.structure.io.Io;
import org.apache.tinkerpop.gremlin.structure.util.AbstractThreadLocalTransaction;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.slf4j.Logger;
import com.alipay.remoting.rpc.RpcServer;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.RateLimiter;
/**
* StandardHugeGraph is the entrance of the graph system, you can modify or
* query the schema/vertex/edge data through this class.
*/
public class StandardHugeGraph implements HugeGraph {
public static final Class<?>[] PROTECT_CLASSES = {
StandardHugeGraph.class,
StandardHugeGraph.StandardHugeGraphParams.class,
TinkerPopTransaction.class,
StandardHugeGraph.Txs.class,
StandardHugeGraph.SysTransaction.class
};
public static final Set<TypedOption<?, ?>> ALLOWED_CONFIGS = ImmutableSet.of(
CoreOptions.TASK_WAIT_TIMEOUT,
CoreOptions.TASK_SYNC_DELETION,
CoreOptions.TASK_TTL_DELETE_BATCH,
CoreOptions.TASK_INPUT_SIZE_LIMIT,
CoreOptions.TASK_RESULT_SIZE_LIMIT,
CoreOptions.OLTP_CONCURRENT_THREADS,
CoreOptions.OLTP_CONCURRENT_DEPTH,
CoreOptions.OLTP_COLLECTION_TYPE,
CoreOptions.VERTEX_DEFAULT_LABEL,
CoreOptions.VERTEX_ENCODE_PK_NUMBER,
CoreOptions.STORE_GRAPH,
CoreOptions.STORE
);
private static final Logger LOG = Log.logger(StandardHugeGraph.class);
private volatile boolean started;
private volatile boolean closed;
private volatile GraphMode mode;
private volatile GraphReadMode readMode;
private volatile HugeVariables variables;
private final String name;
private final StandardHugeGraphParams params;
private final HugeConfig configuration;
private final EventHub schemaEventHub;
private final EventHub graphEventHub;
private final EventHub indexEventHub;
private final LocalCounter localCounter;
private final RateLimiter writeRateLimiter;
private final RateLimiter readRateLimiter;
private final TaskManager taskManager;
private AuthManager authManager;
private RoleElectionStateMachine roleElectionStateMachine;
private final HugeFeatures features;
private final BackendStoreProvider storeProvider;
private final TinkerPopTransaction tx;
private final RamTable ramtable;
public StandardHugeGraph(HugeConfig config) {
this.params = new StandardHugeGraphParams();
this.configuration = config;
this.schemaEventHub = new EventHub("schema");
this.graphEventHub = new EventHub("graph");
this.indexEventHub = new EventHub("index");
this.localCounter = new LocalCounter();
final int writeLimit = config.get(CoreOptions.RATE_LIMIT_WRITE);
this.writeRateLimiter = writeLimit > 0 ?
RateLimiter.create(writeLimit) : null;
final int readLimit = config.get(CoreOptions.RATE_LIMIT_READ);
this.readRateLimiter = readLimit > 0 ?
RateLimiter.create(readLimit) : null;
boolean ramtableEnable = config.get(CoreOptions.QUERY_RAMTABLE_ENABLE);
if (ramtableEnable) {
long vc = config.get(CoreOptions.QUERY_RAMTABLE_VERTICES_CAPACITY);
int ec = config.get(CoreOptions.QUERY_RAMTABLE_EDGES_CAPACITY);
this.ramtable = new RamTable(this, vc, ec);
} else {
this.ramtable = null;
}
this.taskManager = TaskManager.instance();
this.name = config.get(CoreOptions.STORE);
this.started = false;
this.closed = false;
this.mode = GraphMode.NONE;
this.readMode = GraphReadMode.OLTP_ONLY;
LockUtil.init(this.name);
try {
this.storeProvider = this.loadStoreProvider();
} catch (Exception e) {
LockUtil.destroy(this.name);
String message = "Failed to load backend store provider";
LOG.error("{}: {}", message, e.getMessage());
throw new HugeException(message, e);
}
try {
this.tx = new TinkerPopTransaction(this);
boolean supportsPersistence = this.backendStoreFeatures().supportsPersistence();
this.features = new HugeFeatures(this, supportsPersistence);
SnowflakeIdGenerator.init(this.params);
this.taskManager.addScheduler(this.params);
this.authManager = new StandardAuthManager(this.params);
this.variables = null;
} catch (Exception e) {
this.storeProvider.close();
LockUtil.destroy(this.name);
throw e;
}
}
@Override
public String name() {
return this.name;
}
@Override
public HugeGraph hugegraph() {
return this;
}
@Override
public String backend() {
return this.storeProvider.type();
}
@Override
public BackendStoreInfo backendStoreInfo() {
// Just for trigger Tx.getOrNewTransaction, then load 3 stores
// TODO: pass storeProvider.metaStore()
this.systemTransaction();
return new BackendStoreInfo(this.configuration, this.storeProvider);
}
@Override
public BackendFeatures backendStoreFeatures() {
return this.graphTransaction().storeFeatures();
}
@Override
public void serverStarted(Id serverId, NodeRole serverRole) {
LOG.info("Init system info for graph '{}'", this.name);
this.initSystemInfo();
LOG.info("Init server info [{}-{}] for graph '{}'...",
serverId, serverRole, this.name);
this.serverInfoManager().initServerInfo(serverId, serverRole);
this.initRoleStateWorker(serverId);
// TODO: check necessary?
LOG.info("Check olap property-key tables for graph '{}'", this.name);
for (PropertyKey pk : this.schemaTransaction().getPropertyKeys()) {
if (pk.olap()) {
this.graphTransaction().initAndRegisterOlapTable(pk.id());
}
}
LOG.info("Restoring incomplete tasks for graph '{}'...", this.name);
this.taskScheduler().restoreTasks();
this.started = true;
}
private void initRoleStateWorker(Id serverId) {
Config roleStateMachineConfig = new RoleElectionConfig(serverId.toString(),
this.configuration.get(RoleElectionOptions.NODE_EXTERNAL_URL),
this.configuration.get(RoleElectionOptions.EXCEEDS_FAIL_COUNT),
this.configuration.get(RoleElectionOptions.RANDOM_TIMEOUT_MILLISECOND),
this.configuration.get(RoleElectionOptions.HEARTBEAT_INTERVAL_SECOND),
this.configuration.get(RoleElectionOptions.MASTER_DEAD_TIMES),
this.configuration.get(RoleElectionOptions.BASE_TIMEOUT_MILLISECOND));
ClusterRoleStore clusterRoleStore = new StandardClusterRoleStore(this.params);
this.roleElectionStateMachine = new StandardRoleElectionStateMachine(roleStateMachineConfig, clusterRoleStore);
}
@Override
public boolean started() {
return this.started;
}
@Override
public boolean closed() {
if (this.closed && !this.tx.closed()) {
LOG.warn("The tx is not closed while graph '{}' is closed", this);
}
return this.closed;
}
@Override
public GraphMode mode() {
return this.mode;
}
@Override
public void mode(GraphMode mode) {
LOG.info("Graph {} will work in {} mode", this, mode);
this.mode = mode;
}
@Override
public GraphReadMode readMode() {
return this.readMode;
}
@Override
public void readMode(GraphReadMode readMode) {
this.clearVertexCache();
this.readMode = readMode;
}
@Override
public void waitReady(RpcServer rpcServer) {
// Just for trigger Tx.getOrNewTransaction, then load 3 stores
this.schemaTransaction();
this.storeProvider.waitReady(rpcServer);
}
@Override
public void initBackend() {
this.loadSchemaStore().open(this.configuration);
this.loadSystemStore().open(this.configuration);
this.loadGraphStore().open(this.configuration);
LockUtil.lock(this.name, LockUtil.GRAPH_LOCK);
try {
this.storeProvider.init();
/*
* NOTE: The main goal is to write the serverInfo to the central
* node, such as etcd, and also create the system schema in memory,
* which has no side effects
*/
this.initSystemInfo();
} finally {
LockUtil.unlock(this.name, LockUtil.GRAPH_LOCK);
this.loadGraphStore().close();
this.loadSystemStore().close();
this.loadSchemaStore().close();
}
LOG.info("Graph '{}' has been initialized", this.name);
}
@Override
public void clearBackend() {
this.waitUntilAllTasksCompleted();
this.loadSchemaStore().open(this.configuration);
this.loadSystemStore().open(this.configuration);
this.loadGraphStore().open(this.configuration);
LockUtil.lock(this.name, LockUtil.GRAPH_LOCK);
try {
this.storeProvider.clear();
} finally {
LockUtil.unlock(this.name, LockUtil.GRAPH_LOCK);
this.loadGraphStore().close();
this.loadSystemStore().close();
this.loadSchemaStore().close();
}
LOG.info("Graph '{}' has been cleared", this.name);
}
@Override
public void truncateBackend() {
this.waitUntilAllTasksCompleted();
LockUtil.lock(this.name, LockUtil.GRAPH_LOCK);
try {
this.storeProvider.truncate();
// TODO: remove this after serverinfo saved in etcd
this.serverStarted(this.serverInfoManager().selfServerId(),
this.serverInfoManager().selfServerRole());
} finally {
LockUtil.unlock(this.name, LockUtil.GRAPH_LOCK);
}
LOG.info("Graph '{}' has been truncated", this.name);
}
@Override
public void initSystemInfo() {
try {
this.taskScheduler().init();
this.serverInfoManager().init();
this.authManager().init();
} finally {
this.closeTx();
}
LOG.debug("Graph '{}' system info has been initialized", this);
}
@Override
public void createSnapshot() {
LockUtil.lock(this.name, LockUtil.GRAPH_LOCK);
try {
this.storeProvider.createSnapshot();
} finally {
LockUtil.unlock(this.name, LockUtil.GRAPH_LOCK);
}
LOG.info("Graph '{}' has created snapshot", this.name);
}
@Override
public void resumeSnapshot() {
LockUtil.lock(this.name, LockUtil.GRAPH_LOCK);
try {
this.storeProvider.resumeSnapshot();
} finally {
LockUtil.unlock(this.name, LockUtil.GRAPH_LOCK);
}
LOG.info("Graph '{}' has resumed from snapshot", this.name);
}
private void clearVertexCache() {
Future<?> future = this.graphEventHub.notify(Events.CACHE, "clear",
HugeType.VERTEX);
try {
future.get();
} catch (Throwable e) {
LOG.warn("Error when waiting for event execution: vertex cache " +
"clear", e);
}
}
private SchemaTransaction openSchemaTransaction() throws HugeException {
this.checkGraphNotClosed();
try {
return new CachedSchemaTransaction(this.params, loadSchemaStore());
} catch (BackendException e) {
String message = "Failed to open schema transaction";
LOG.error("{}", message, e);
throw new HugeException(message, e);
}
}
private SysTransaction openSystemTransaction() throws HugeException {
this.checkGraphNotClosed();
try {
return new SysTransaction(this.params, loadSystemStore());
} catch (BackendException e) {
String message = "Failed to open system transaction";
LOG.error("{}", message, e);
throw new HugeException(message);
}
}
private GraphTransaction openGraphTransaction() throws HugeException {
// Open a new one
this.checkGraphNotClosed();
try {
return new CachedGraphTransaction(this.params, loadGraphStore());
} catch (BackendException e) {
String message = "Failed to open graph transaction";
LOG.error("{}", message, e);
throw new HugeException(message);
}
}
private void checkGraphNotClosed() {
E.checkState(!this.closed, "Graph '%s' has been closed", this);
}
private BackendStore loadSchemaStore() {
return this.storeProvider.loadSchemaStore(this.configuration);
}
private BackendStore loadGraphStore() {
return this.storeProvider.loadGraphStore(this.configuration);
}
private BackendStore loadSystemStore() {
return this.storeProvider.loadSystemStore(this.configuration);
}
@Watched
private SchemaTransaction schemaTransaction() {
this.checkGraphNotClosed();
/*
* NOTE: each schema operation will be auto committed,
* Don't need to open tinkerpop tx by readWrite() and commit manually.
*/
return this.tx.schemaTransaction();
}
private SysTransaction systemTransaction() {
this.checkGraphNotClosed();
/*
* NOTE: system operations must be committed manually,
* Maybe users need to auto open tinkerpop tx by readWrite().
*/
this.tx.readWrite();
return this.tx.systemTransaction();
}
@Watched
private GraphTransaction graphTransaction() {
this.checkGraphNotClosed();
/*
* NOTE: graph operations must be committed manually,
* Maybe users need to auto open tinkerpop tx by readWrite().
*/
this.tx.readWrite();
return this.tx.graphTransaction();
}
private BackendStoreProvider loadStoreProvider() {
return BackendProviderFactory.open(this.params);
}
private AbstractSerializer serializer() {
String name = this.configuration.get(CoreOptions.SERIALIZER);
LOG.debug("Loading serializer '{}' for graph '{}'", name, this.name);
AbstractSerializer serializer = SerializerFactory.serializer(this.configuration, name);
if (serializer == null) {
throw new HugeException("Can't load serializer with name " + name);
}
return serializer;
}
private Analyzer analyzer() {
String name = this.configuration.get(CoreOptions.TEXT_ANALYZER);
String mode = this.configuration.get(CoreOptions.TEXT_ANALYZER_MODE);
LOG.debug("Loading text analyzer '{}' with mode '{}' for graph '{}'",
name, mode, this.name);
return AnalyzerFactory.analyzer(name, mode);
}
protected void reloadRamtable() {
this.reloadRamtable(false);
}
protected void reloadRamtable(boolean loadFromFile) {
// Expect triggered manually, like gremlin job
if (this.ramtable != null) {
this.ramtable.reload(loadFromFile, this.name);
} else {
LOG.warn("The ramtable feature is not enabled for graph {}", this);
}
}
@Override
public <C extends GraphComputer> C compute(Class<C> clazz)
throws IllegalArgumentException {
throw Graph.Exceptions.graphComputerNotSupported();
}
@Override
public GraphComputer compute() throws IllegalArgumentException {
throw Graph.Exceptions.graphComputerNotSupported();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public <I extends Io> I io(final Io.Builder<I> builder) {
return (I) builder.graph(this).onMapper(mapper ->
mapper.addRegistry(HugeGraphIoRegistry.instance())
).create();
}
@Override
public Vertex addVertex(Object... keyValues) {
return this.graphTransaction().addVertex(keyValues);
}
@Override
public void removeVertex(Vertex vertex) {
this.graphTransaction().removeVertex((HugeVertex) vertex);
}
@Override
public void removeVertex(String label, Object id) {
if (label != null) {
VertexLabel vl = this.vertexLabel(label);
// It's OK even if exist adjacent edges `vl.existsLinkLabel()`
if (!vl.existsIndexLabel()) {
// Improve perf by removeVertex(id)
Id idValue = HugeVertex.getIdValue(id);
HugeVertex vertex = new HugeVertex(this, idValue, vl);
this.removeVertex(vertex);
return;
}
}
this.vertex(id).remove();
}
@Override
public <V> void addVertexProperty(VertexProperty<V> p) {
this.graphTransaction().addVertexProperty((HugeVertexProperty<V>) p);
}
@Override
public <V> void removeVertexProperty(VertexProperty<V> p) {
this.graphTransaction().removeVertexProperty((HugeVertexProperty<V>) p);
}
@Override
public Edge addEdge(Edge edge) {
return this.graphTransaction().addEdge((HugeEdge) edge);
}
@Override
public void canAddEdge(Edge edge) {
// pass
}
@Override
public void removeEdge(Edge edge) {
this.graphTransaction().removeEdge((HugeEdge) edge);
}
@Override
public void removeEdge(String label, Object id) {
if (label != null) {
EdgeLabel el = this.edgeLabel(label);
if (!el.existsIndexLabel()) {
// Improve perf by removeEdge(id)
Id idValue = HugeEdge.getIdValue(id, false);
HugeEdge edge = new HugeEdge(this, idValue, el);
this.removeEdge(edge);
return;
}
}
this.edge(id).remove();
}
@Override
public <V> void addEdgeProperty(Property<V> p) {
this.graphTransaction().addEdgeProperty((HugeEdgeProperty<V>) p);
}
@Override
public <V> void removeEdgeProperty(Property<V> p) {
this.graphTransaction().removeEdgeProperty((HugeEdgeProperty<V>) p);
}
@Override
public Vertex vertex(Object object) {
return this.graphTransaction().queryVertex(object);
}
@Override
public Iterator<Vertex> vertices(Object... objects) {
if (objects.length == 0) {
return this.graphTransaction().queryVertices();
}
return this.graphTransaction().queryVertices(objects);
}
@Override
public Iterator<Vertex> vertices(Query query) {
return this.graphTransaction().queryVertices(query);
}
@Override
public Iterator<Vertex> adjacentVertex(Object id) {
return this.graphTransaction().queryAdjacentVertices(id);
}
@Override
public boolean checkAdjacentVertexExist() {
return this.graphTransaction().checkAdjacentVertexExist();
}
@Override
public Edge edge(Object object) {
return this.graphTransaction().queryEdge(object);
}
@Override
public Iterator<Edge> edges(Object... objects) {
if (objects.length == 0) {
return this.graphTransaction().queryEdges();
}
return this.graphTransaction().queryEdges(objects);
}
@Override
@Watched
public Iterator<Edge> edges(Query query) {
return this.graphTransaction().queryEdges(query);
}
@Override
public Iterator<Vertex> adjacentVertices(Iterator<Edge> edges) {
return this.graphTransaction().queryAdjacentVertices(edges);
}
@Override
public Iterator<Edge> adjacentEdges(Id vertexId) {
return this.graphTransaction().queryEdgesByVertex(vertexId);
}
@Override
public Number queryNumber(Query query) {
return this.graphTransaction().queryNumber(query);
}
@Override
public Id addPropertyKey(PropertyKey pkey) {
assert this.name.equals(pkey.graph().name());
if (pkey.olap()) {
this.clearVertexCache();
}
return this.schemaTransaction().addPropertyKey(pkey);
}
@Override
public void updatePropertyKey(PropertyKey pkey) {
assert this.name.equals(pkey.graph().name());
this.schemaTransaction().updatePropertyKey(pkey);
}
@Override
public Id removePropertyKey(Id pkey) {
if (this.propertyKey(pkey).olap()) {
this.clearVertexCache();
}
return this.schemaTransaction().removePropertyKey(pkey);
}
@Override
public Collection<PropertyKey> propertyKeys() {
return this.schemaTransaction().getPropertyKeys();
}
@Override
public PropertyKey propertyKey(Id id) {
PropertyKey pk = this.schemaTransaction().getPropertyKey(id);
E.checkArgument(pk != null, "Undefined property key with id: '%s'", id);
return pk;
}
@Override
public PropertyKey propertyKey(String name) {
PropertyKey pk = this.schemaTransaction().getPropertyKey(name);
E.checkArgument(pk != null, "Undefined property key: '%s'", name);
return pk;
}
@Override
public Id clearPropertyKey(PropertyKey propertyKey) {
if (propertyKey.oltp()) {
return IdGenerator.ZERO;
}
this.clearVertexCache();
return this.schemaTransaction().clearOlapPk(propertyKey);
}
@Override
public boolean existsPropertyKey(String name) {
return this.schemaTransaction().getPropertyKey(name) != null;
}
@Override
public void addVertexLabel(VertexLabel label) {
assert this.name.equals(label.graph().name());
this.schemaTransaction().addVertexLabel(label);
}
@Override
public void updateVertexLabel(VertexLabel label) {
assert this.name.equals(label.graph().name());
this.schemaTransaction().updateVertexLabel(label);
}
@Override
public Id removeVertexLabel(Id label) {
return this.schemaTransaction().removeVertexLabel(label);
}
@Override
public Collection<VertexLabel> vertexLabels() {
return this.schemaTransaction().getVertexLabels();
}
@Override
@Watched
public VertexLabel vertexLabelOrNone(Id id) {
VertexLabel vl = this.schemaTransaction().getVertexLabel(id);
if (vl == null) {
vl = VertexLabel.undefined(this, id);
}
return vl;
}
@Override
public VertexLabel vertexLabel(Id id) {
VertexLabel vl = this.schemaTransaction().getVertexLabel(id);
E.checkArgument(vl != null, "Undefined vertex label with id: '%s'", id);
return vl;
}
@Override
public VertexLabel vertexLabel(String name) {
VertexLabel vl = this.schemaTransaction().getVertexLabel(name);
E.checkArgument(vl != null, "Undefined vertex label: '%s'", name);
return vl;
}
@Override
public boolean existsVertexLabel(String name) {
return this.schemaTransaction().getVertexLabel(name) != null;
}
@Override
public boolean existsLinkLabel(Id vertexLabel) {
List<EdgeLabel> edgeLabels = this.schemaTransaction().getEdgeLabels();
for (EdgeLabel edgeLabel : edgeLabels) {
if (edgeLabel.linkWithLabel(vertexLabel)) {
return true;
}
}
return false;
}
@Override
public void addEdgeLabel(EdgeLabel label) {
assert this.name.equals(label.graph().name());
this.schemaTransaction().addEdgeLabel(label);
}
@Override
public void updateEdgeLabel(EdgeLabel label) {
assert this.name.equals(label.graph().name());
this.schemaTransaction().updateEdgeLabel(label);
}
@Override
public Id removeEdgeLabel(Id id) {
return this.schemaTransaction().removeEdgeLabel(id);
}
@Override
public Collection<EdgeLabel> edgeLabels() {
return this.schemaTransaction().getEdgeLabels();
}
@Override
public EdgeLabel edgeLabelOrNone(Id id) {
EdgeLabel el = this.schemaTransaction().getEdgeLabel(id);
if (el == null) {
el = EdgeLabel.undefined(this, id);
}
return el;
}
@Override
public EdgeLabel edgeLabel(Id id) {
EdgeLabel el = this.schemaTransaction().getEdgeLabel(id);
E.checkArgument(el != null, "Undefined edge label with id: '%s'", id);
return el;
}
@Override
public EdgeLabel edgeLabel(String name) {
EdgeLabel el = this.schemaTransaction().getEdgeLabel(name);
E.checkArgument(el != null, "Undefined edge label: '%s'", name);
return el;
}
@Override
public boolean existsEdgeLabel(String name) {
return this.schemaTransaction().getEdgeLabel(name) != null;
}
@Override
public void addIndexLabel(SchemaLabel schemaLabel, IndexLabel indexLabel) {
assert VertexLabel.OLAP_VL.equals(schemaLabel) ||
this.name.equals(schemaLabel.graph().name());
assert this.name.equals(indexLabel.graph().name());
this.schemaTransaction().addIndexLabel(schemaLabel, indexLabel);
}
@Override
public void updateIndexLabel(IndexLabel label) {
assert this.name.equals(label.graph().name());
this.schemaTransaction().updateIndexLabel(label);
}
@Override
public Id removeIndexLabel(Id id) {
return this.schemaTransaction().removeIndexLabel(id);
}
@Override
public Id rebuildIndex(SchemaElement schema) {
return this.schemaTransaction().rebuildIndex(schema);
}
@Override
public Collection<IndexLabel> indexLabels() {
return this.schemaTransaction().getIndexLabels();
}
@Override
public IndexLabel indexLabel(Id id) {
IndexLabel il = this.schemaTransaction().getIndexLabel(id);
E.checkArgument(il != null, "Undefined index label with id: '%s'", id);
return il;
}
@Override
public IndexLabel indexLabel(String name) {
IndexLabel il = this.schemaTransaction().getIndexLabel(name);
E.checkArgument(il != null, "Undefined index label: '%s'", name);
return il;
}
@Override
public boolean existsIndexLabel(String name) {
return this.schemaTransaction().getIndexLabel(name) != null;
}
@Override
public Transaction tx() {
return this.tx;
}
@Override
public synchronized void close() throws Exception {
if (this.closed()) {
return;
}
LOG.info("Close graph {}", this);
if (StandardAuthManager.isLocal(this.authManager)) {
this.authManager.close();
}
this.taskManager.closeScheduler(this.params);
try {
this.closeTx();
} finally {
this.closed = true;
this.storeProvider.close();
LockUtil.destroy(this.name);
}
// Make sure that all transactions are closed in all threads
E.checkState(this.tx.closed(),
"Ensure tx closed in all threads when closing graph '%s'",
this.name);
}
@Override
public void create(String configPath, Id server, NodeRole role) {
this.initBackend();
this.serverStarted(server, role);
// Write config to disk file
String confPath = ConfigUtil.writeToFile(configPath, this.name(),
this.configuration());
this.configuration.file(confPath);
}
@Override
public void drop() {
this.clearBackend();
HugeConfig config = this.configuration();
this.storeProvider.onDeleteConfig(config);
ConfigUtil.deleteFile(config.file());
try {
/*
* It's hard to ensure all threads close the tx.
* TODO:
* - schedule a tx-close to each thread,
* or
* - add forceClose() method to backend store.
*/
this.close();
} catch (Throwable e) {
LOG.warn("Failed to close graph {} {}", this, e);
}
}
@Override
public HugeConfig cloneConfig(String newGraph) {
HugeConfig config = (HugeConfig) this.configuration().clone();
this.storeProvider.onCloneConfig(config, newGraph);
return config;
}
@Override
public HugeFeatures features() {
return this.features;
}
@Override
public synchronized Variables variables() {
if (this.variables == null) {
this.variables = new HugeVariables(this.params);
}
// Ensure variables() work after variables schema was cleared
this.variables.initSchemaIfNeeded();
return this.variables;
}
@Override
public SchemaManager schema() {
return new SchemaManager(this.schemaTransaction(), this);
}
@Override
public Id getNextId(HugeType type) {
return this.schemaTransaction().getNextId(type);
}
@Override
public <T> T metadata(HugeType type, String meta, Object... args) {
return this.graphTransaction().metadata(type, meta, args);
}
@Override
public TaskScheduler taskScheduler() {
TaskScheduler scheduler = this.taskManager.getScheduler(this.params);
E.checkState(scheduler != null,
"Can't find task scheduler for graph '%s'", this);
return scheduler;
}
private ServerInfoManager serverInfoManager() {
ServerInfoManager manager = this.taskManager
.getServerInfoManager(this.params);
E.checkState(manager != null,
"Can't find server info manager for graph '%s'", this);
return manager;
}
@Override
public AuthManager authManager() {
// this.authManager.initSchemaIfNeeded();
return this.authManager;
}
@Override
public RoleElectionStateMachine roleElectionStateMachine() {
return this.roleElectionStateMachine;
}
@Override
public void switchAuthManager(AuthManager authManager) {
this.authManager = authManager;
}
@Override
public RaftGroupManager raftGroupManager() {
if (!(this.storeProvider instanceof RaftBackendStoreProvider)) {
return null;
}
RaftBackendStoreProvider provider =
((RaftBackendStoreProvider) this.storeProvider);
return provider.raftNodeManager();
}
@Override
public HugeConfig configuration() {
return this.configuration;
}
@Override
public String toString() {
return StringFactory.graphString(this, this.name());
}
@Override
public final void proxy(HugeGraph graph) {
this.params.graph(graph);
}
@Override
public boolean sameAs(HugeGraph graph) {
return this == graph;
}
@Override
public long now() {
return ((TinkerPopTransaction) this.tx()).openedTime();
}
@Override
public <K, V> V option(TypedOption<K, V> option) {
HugeConfig config = this.configuration();
if (!ALLOWED_CONFIGS.contains(option)) {
throw new NotAllowException("Not allowed to access config: %s",
option.name());
}
return config.get(option);
}
@Override
public void registerRpcServices(RpcServiceConfig4Server serverConfig,
RpcServiceConfig4Client clientConfig) {
/*
* Skip register cache-rpc service if it's non-shared storage,
* because we assume cache of non-shared storage is updated by raft.
*/
if (!this.backendStoreFeatures().supportsSharedStorage()) {
return;
}
Class<GraphCacheNotifier> clazz1 = GraphCacheNotifier.class;
// The proxy is sometimes unavailable (issue #664)
CacheNotifier proxy = clientConfig.serviceProxy(this.name, clazz1);
serverConfig.addService(this.name, clazz1, new HugeGraphCacheNotifier(
this.graphEventHub, proxy));
Class<SchemaCacheNotifier> clazz2 = SchemaCacheNotifier.class;
proxy = clientConfig.serviceProxy(this.name, clazz2);
serverConfig.addService(this.name, clazz2, new HugeSchemaCacheNotifier(
this.schemaEventHub, proxy));
}
private void closeTx() {
try {
if (this.tx.isOpen()) {
this.tx.close();
}
} finally {
this.tx.destroyTransaction();
}
}
private void waitUntilAllTasksCompleted() {
long timeout = this.configuration.get(CoreOptions.TASK_WAIT_TIMEOUT);
try {
this.taskScheduler().waitUntilAllTasksCompleted(timeout);
} catch (TimeoutException e) {
throw new HugeException("Failed to wait all tasks to complete", e);
}
}
private class StandardHugeGraphParams implements HugeGraphParams {
private HugeGraph graph = StandardHugeGraph.this;
private final EphemeralJobQueue ephemeralJobQueue = new EphemeralJobQueue(this);
private void graph(HugeGraph graph) {
this.graph = graph;
}
@Override
public HugeGraph graph() {
return this.graph;
}
@Override
public String name() {
return StandardHugeGraph.this.name();
}
@Override
public GraphMode mode() {
return StandardHugeGraph.this.mode();
}
@Override
public GraphReadMode readMode() {
return StandardHugeGraph.this.readMode();
}
@Override
public SchemaTransaction schemaTransaction() {
return StandardHugeGraph.this.schemaTransaction();
}
@Override
public GraphTransaction systemTransaction() {
return StandardHugeGraph.this.systemTransaction();
}
@Override
public GraphTransaction graphTransaction() {
return StandardHugeGraph.this.graphTransaction();
}
@Override
public GraphTransaction openTransaction() {
// Open a new one
return StandardHugeGraph.this.openGraphTransaction();
}
@Override
public void closeTx() {
StandardHugeGraph.this.closeTx();
}
@Override
public boolean started() {
return StandardHugeGraph.this.started();
}
@Override
public boolean closed() {
return StandardHugeGraph.this.closed();
}
@Override
public boolean initialized() {
return StandardHugeGraph.this.graphTransaction().storeInitialized();
}
@Override
public BackendFeatures backendStoreFeatures() {
return StandardHugeGraph.this.backendStoreFeatures();
}
@Override
public BackendStore loadSchemaStore() {
return StandardHugeGraph.this.loadSchemaStore();
}
@Override
public BackendStore loadGraphStore() {
return StandardHugeGraph.this.loadGraphStore();
}
@Override
public BackendStore loadSystemStore() {
return StandardHugeGraph.this.loadSystemStore();
}
@Override
public EventHub schemaEventHub() {
return StandardHugeGraph.this.schemaEventHub;
}
@Override
public EventHub graphEventHub() {
return StandardHugeGraph.this.graphEventHub;
}
@Override
public EventHub indexEventHub() {
return StandardHugeGraph.this.indexEventHub;
}
@Override
public HugeConfig configuration() {
return StandardHugeGraph.this.configuration();
}
@Override
public ServerInfoManager serverManager() {
// this.serverManager.initSchemaIfNeeded();
return StandardHugeGraph.this.serverInfoManager();
}
@Override
public LocalCounter counter() {
return StandardHugeGraph.this.localCounter;
}
@Override
public AbstractSerializer serializer() {
return StandardHugeGraph.this.serializer();
}
@Override
public Analyzer analyzer() {
return StandardHugeGraph.this.analyzer();
}
@Override
public RateLimiter writeRateLimiter() {
return StandardHugeGraph.this.writeRateLimiter;
}
@Override
public RateLimiter readRateLimiter() {
return StandardHugeGraph.this.readRateLimiter;
}
@Override
public RamTable ramtable() {
return StandardHugeGraph.this.ramtable;
}
@Override
public <T> void submitEphemeralJob(EphemeralJob<T> job) {
this.ephemeralJobQueue.add(job);
}
}
private class TinkerPopTransaction extends AbstractThreadLocalTransaction {
// Times opened from upper layer
private final AtomicInteger refs;
// Flag opened of each thread
private final ThreadLocal<Boolean> opened;
// Backend transactions
private final ThreadLocal<Txs> transactions;
public TinkerPopTransaction(Graph graph) {
super(graph);
this.refs = new AtomicInteger();
this.opened = ThreadLocal.withInitial(() -> false);
this.transactions = ThreadLocal.withInitial(() -> null);
}
public boolean closed() {
int refs = this.refs.get();
assert refs >= 0 : refs;
return refs == 0;
}
/**
* Commit tx if batch size reaches the specified value,
* it may be used by Gremlin
*/
@SuppressWarnings("unused")
public void commitIfGtSize(int size) {
// Only commit graph transaction data (schema auto committed)
this.graphTransaction().commitIfGtSize(size);
}
@Override
public void commit() {
try {
super.commit();
} finally {
this.setClosed();
}
}
@Override
public void rollback() {
try {
super.rollback();
} finally {
this.setClosed();
}
}
@Override
public <G extends Graph> G createThreadedTx() {
throw Transaction.Exceptions.threadedTransactionsNotSupported();
}
@Override
public boolean isOpen() {
return this.opened.get();
}
@Override
protected void doOpen() {
this.getOrNewTransaction();
this.setOpened();
}
@Override
protected void doCommit() {
this.verifyOpened();
this.getOrNewTransaction().commit();
}
@Override
protected void doRollback() {
this.verifyOpened();
this.getOrNewTransaction().rollback();
}
@Override
protected void doClose() {
this.verifyOpened();
try {
// Calling super.doClose() will clear listeners
super.doClose();
} finally {
this.resetState();
}
}
@Override
public String toString() {
return String.format("TinkerPopTransaction{opened=%s, txs=%s}",
this.opened.get(), this.transactions.get());
}
public long openedTime() {
return this.transactions.get().openedTime();
}
private void verifyOpened() {
if (!this.isOpen()) {
throw new HugeException("Transaction has not been opened");
}
}
private void resetState() {
this.setClosed();
this.readWriteConsumerInternal.set(READ_WRITE_BEHAVIOR.AUTO);
this.closeConsumerInternal.set(CLOSE_BEHAVIOR.ROLLBACK);
}
private void setOpened() {
// The backend tx may be reused, here just set a flag
assert !this.opened.get();
this.opened.set(true);
this.transactions.get().openedTime(DateUtil.now().getTime());
this.refs.incrementAndGet();
}
private void setClosed() {
// Just set flag opened=false to reuse the backend tx
if (this.opened.get()) {
this.opened.set(false);
this.refs.decrementAndGet();
}
}
private SchemaTransaction schemaTransaction() {
return this.getOrNewTransaction().schemaTx;
}
private SysTransaction systemTransaction() {
return this.getOrNewTransaction().systemTx;
}
private GraphTransaction graphTransaction() {
return this.getOrNewTransaction().graphTx;
}
private Txs getOrNewTransaction() {
/*
* NOTE: this method may be called even tx is not opened,
* the reason is for reusing backend tx.
* so we don't call this.verifyOpened() here.
*/
Txs txs = this.transactions.get();
if (txs == null) {
SchemaTransaction schemaTransaction = null;
SysTransaction sysTransaction = null;
GraphTransaction graphTransaction = null;
try {
schemaTransaction = openSchemaTransaction();
sysTransaction = openSystemTransaction();
graphTransaction = openGraphTransaction();
txs = new Txs(schemaTransaction, sysTransaction,
graphTransaction);
} catch (Throwable e) {
if (schemaTransaction != null) {
schemaTransaction.close();
}
if (sysTransaction != null) {
sysTransaction.close();
}
if (graphTransaction != null) {
graphTransaction.close();
}
throw e;
}
this.transactions.set(txs);
}
return txs;
}
private void destroyTransaction() {
if (this.isOpen()) {
throw new HugeException(
"Transaction should be closed before destroying");
}
// Do close if needed, then remove the reference
Txs txs = this.transactions.get();
if (txs != null) {
txs.close();
}
this.transactions.remove();
}
}
private static final class Txs {
private final SchemaTransaction schemaTx;
private final SysTransaction systemTx;
private final GraphTransaction graphTx;
private long openedTime;
public Txs(SchemaTransaction schemaTx, SysTransaction systemTx,
GraphTransaction graphTx) {
assert schemaTx != null && systemTx != null && graphTx != null;
this.schemaTx = schemaTx;
this.systemTx = systemTx;
this.graphTx = graphTx;
this.openedTime = DateUtil.now().getTime();
}
public void commit() {
this.graphTx.commit();
}
public void rollback() {
this.graphTx.rollback();
}
public void close() {
try {
this.graphTx.close();
} catch (Exception e) {
LOG.error("Failed to close GraphTransaction", e);
}
try {
this.systemTx.close();
} catch (Exception e) {
LOG.error("Failed to close SystemTransaction", e);
}
try {
this.schemaTx.close();
} catch (Exception e) {
LOG.error("Failed to close SchemaTransaction", e);
}
}
public void openedTime(long time) {
this.openedTime = time;
}
public long openedTime() {
return this.openedTime;
}
@Override
public String toString() {
return String.format("{schemaTx=%s,systemTx=%s,graphTx=%s}",
this.schemaTx, this.systemTx, this.graphTx);
}
}
private static class SysTransaction extends GraphTransaction {
public SysTransaction(HugeGraphParams graph, BackendStore store) {
super(graph, store);
this.autoCommit(true);
}
}
private static class AbstractCacheNotifier implements CacheNotifier {
private final EventHub hub;
private final EventListener cacheEventListener;
public AbstractCacheNotifier(EventHub hub, CacheNotifier proxy) {
this.hub = hub;
this.cacheEventListener = event -> {
Object[] args = event.args();
E.checkArgument(args.length > 0 && args[0] instanceof String,
"Expect event action argument");
if (Cache.ACTION_INVALIDED.equals(args[0])) {
event.checkArgs(String.class, HugeType.class, Object.class);
HugeType type = (HugeType) args[1];
Object ids = args[2];
if (ids instanceof Id[]) {
// argument type mismatch: proxy.invalid2(type,Id[]ids)
proxy.invalid2(type, (Id[]) ids);
} else if (ids instanceof Id) {
proxy.invalid(type, (Id) ids);
} else {
E.checkArgument(false, "Unexpected argument: %s", ids);
}
return true;
} else if (Cache.ACTION_CLEARED.equals(args[0])) {
event.checkArgs(String.class, HugeType.class);
HugeType type = (HugeType) args[1];
proxy.clear(type);
return true;
}
return false;
};
this.hub.listen(Events.CACHE, this.cacheEventListener);
}
@Override
public void close() {
this.hub.unlisten(Events.CACHE, this.cacheEventListener);
}
@Override
public void invalid(HugeType type, Id id) {
this.hub.notify(Events.CACHE, Cache.ACTION_INVALID, type, id);
}
@Override
public void invalid2(HugeType type, Object[] ids) {
this.hub.notify(Events.CACHE, Cache.ACTION_INVALID, type, ids);
}
@Override
public void clear(HugeType type) {
this.hub.notify(Events.CACHE, Cache.ACTION_CLEAR, type);
}
@Override
public void reload() {
// pass
}
}
private static class HugeSchemaCacheNotifier
extends AbstractCacheNotifier
implements SchemaCacheNotifier {
public HugeSchemaCacheNotifier(EventHub hub, CacheNotifier proxy) {
super(hub, proxy);
}
}
private static class HugeGraphCacheNotifier
extends AbstractCacheNotifier
implements GraphCacheNotifier {
public HugeGraphCacheNotifier(EventHub hub, CacheNotifier proxy) {
super(hub, proxy);
}
}
}