blob: f0f69319d6821a23b15a1a6f1518245eb3569862 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.masterelection;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.hugegraph.HugeGraphParams;
import org.apache.hugegraph.auth.SchemaDefine;
import org.apache.hugegraph.backend.query.Condition;
import org.apache.hugegraph.backend.query.ConditionQuery;
import org.apache.hugegraph.backend.store.BackendEntry;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.schema.VertexLabel;
import org.apache.hugegraph.structure.HugeVertex;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.DataType;
import org.apache.hugegraph.type.define.HugeKeys;
import org.apache.hugegraph.util.Log;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.slf4j.Logger;
public class StandardClusterRoleStore implements ClusterRoleStore {
private static final Logger LOG = Log.logger(StandardClusterRoleStore.class);
private static final int RETRY_QUERY_TIMEOUT = 200;
private final HugeGraphParams graph;
private boolean firstTime;
public StandardClusterRoleStore(HugeGraphParams graph) {
this.graph = graph;
Schema schema = new Schema(graph);
schema.initSchemaIfNeeded();
this.firstTime = true;
}
@Override
public boolean updateIfNodePresent(ClusterRole clusterRole) {
// if epoch increase, update and return true
// if epoch equal, ignore different node, return false
Optional<Vertex> oldClusterRoleOpt = this.queryVertex();
if (oldClusterRoleOpt.isPresent()) {
ClusterRole oldClusterRole = this.from(oldClusterRoleOpt.get());
if (clusterRole.epoch() < oldClusterRole.epoch()) {
return false;
}
if (clusterRole.epoch() == oldClusterRole.epoch() &&
!Objects.equals(clusterRole.node(), oldClusterRole.node())) {
return false;
}
LOG.trace("Server {} epoch {} begin remove data old epoch {}, ",
clusterRole.node(), clusterRole.epoch(), oldClusterRole.epoch());
this.graph.systemTransaction().removeVertex((HugeVertex) oldClusterRoleOpt.get());
this.graph.systemTransaction().commitOrRollback();
LOG.trace("Server {} epoch {} success remove data old epoch {}, ",
clusterRole.node(), clusterRole.epoch(), oldClusterRole.epoch());
}
try {
GraphTransaction tx = this.graph.systemTransaction();
tx.doUpdateIfAbsent(this.constructEntry(clusterRole));
tx.commitOrRollback();
LOG.trace("Server {} epoch {} success update data",
clusterRole.node(), clusterRole.epoch());
} catch (Throwable ignore) {
LOG.trace("Server {} epoch {} fail update data",
clusterRole.node(), clusterRole.epoch());
return false;
}
return true;
}
private BackendEntry constructEntry(ClusterRole clusterRole) {
List<Object> list = new ArrayList<>(8);
list.add(T.label);
list.add(P.ROLE_DATA);
list.add(P.NODE);
list.add(clusterRole.node());
list.add(P.URL);
list.add(clusterRole.url());
list.add(P.CLOCK);
list.add(clusterRole.clock());
list.add(P.EPOCH);
list.add(clusterRole.epoch());
list.add(P.TYPE);
list.add("default");
HugeVertex vertex = this.graph.systemTransaction()
.constructVertex(false, list.toArray());
return this.graph.serializer().writeVertex(vertex);
}
@Override
public Optional<ClusterRole> query() {
Optional<Vertex> vertex = this.queryVertex();
if (!vertex.isPresent() && !this.firstTime) {
// If query nothing, retry once
try {
Thread.sleep(RETRY_QUERY_TIMEOUT);
} catch (InterruptedException ignored) {
}
vertex = this.queryVertex();
}
this.firstTime = false;
return vertex.map(this::from);
}
private ClusterRole from(Vertex vertex) {
String node = (String) vertex.property(P.NODE).value();
String url = (String) vertex.property(P.URL).value();
Long clock = (Long) vertex.property(P.CLOCK).value();
Integer epoch = (Integer) vertex.property(P.EPOCH).value();
return new ClusterRole(node, url, epoch, clock);
}
private Optional<Vertex> queryVertex() {
GraphTransaction tx = this.graph.systemTransaction();
ConditionQuery query = new ConditionQuery(HugeType.VERTEX);
VertexLabel vl = this.graph.graph().vertexLabel(P.ROLE_DATA);
query.eq(HugeKeys.LABEL, vl.id());
query.query(Condition.eq(vl.primaryKeys().get(0), "default"));
query.showHidden(true);
Iterator<Vertex> vertexIterator = tx.queryVertices(query);
if (vertexIterator.hasNext()) {
return Optional.of(vertexIterator.next());
}
return Optional.empty();
}
public static final class P {
public static final String ROLE_DATA = Graph.Hidden.hide("role_data");
public static final String LABEL = T.label.getAccessor();
public static final String NODE = Graph.Hidden.hide("role_node");
public static final String CLOCK = Graph.Hidden.hide("role_clock");
public static final String EPOCH = Graph.Hidden.hide("role_epoch");
public static final String URL = Graph.Hidden.hide("role_url");
public static final String TYPE = Graph.Hidden.hide("role_type");
}
public static final class Schema extends SchemaDefine {
public Schema(HugeGraphParams graph) {
super(graph, P.ROLE_DATA);
}
@Override
public void initSchemaIfNeeded() {
if (this.existVertexLabel(this.label)) {
return;
}
String[] properties = this.initProperties();
VertexLabel label = this.schema()
.vertexLabel(this.label)
.enableLabelIndex(true)
.usePrimaryKeyId()
.primaryKeys(P.TYPE)
.properties(properties)
.build();
this.graph.schemaTransaction().addVertexLabel(label);
}
private String[] initProperties() {
List<String> props = new ArrayList<>();
props.add(createPropertyKey(P.NODE, DataType.TEXT));
props.add(createPropertyKey(P.URL, DataType.TEXT));
props.add(createPropertyKey(P.CLOCK, DataType.LONG));
props.add(createPropertyKey(P.EPOCH, DataType.INT));
props.add(createPropertyKey(P.TYPE, DataType.TEXT));
return super.initProperties(props);
}
}
}