blob: e8cccf88e33ba80de271ecf425572c5c208f4700 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.task;
import static org.apache.hugegraph.backend.query.Query.NO_LIMIT;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import org.apache.hugegraph.HugeException;
import org.apache.hugegraph.HugeGraph;
import org.apache.hugegraph.HugeGraphParams;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.page.PageInfo;
import org.apache.hugegraph.backend.query.Condition;
import org.apache.hugegraph.backend.query.ConditionQuery;
import org.apache.hugegraph.backend.query.QueryResults;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.exception.ConnectionException;
import org.apache.hugegraph.iterator.ListIterator;
import org.apache.hugegraph.iterator.MapperIterator;
import org.apache.hugegraph.schema.PropertyKey;
import org.apache.hugegraph.schema.VertexLabel;
import org.apache.hugegraph.structure.HugeVertex;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.HugeKeys;
import org.apache.hugegraph.type.define.NodeRole;
import org.apache.hugegraph.util.DateUtil;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Log;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableMap;
public class ServerInfoManager {
private static final Logger LOG = Log.logger(ServerInfoManager.class);
public static final long MAX_SERVERS = 100000L;
public static final long PAGE_SIZE = 10L;
private final HugeGraphParams graph;
private final ExecutorService dbExecutor;
private Id selfServerId;
private NodeRole selfServerRole;
private volatile boolean onlySingleNode;
private volatile boolean closed;
public ServerInfoManager(HugeGraphParams graph,
ExecutorService dbExecutor) {
E.checkNotNull(graph, "graph");
E.checkNotNull(dbExecutor, "db executor");
this.graph = graph;
this.dbExecutor = dbExecutor;
this.selfServerId = null;
this.selfServerRole = NodeRole.MASTER;
this.onlySingleNode = false;
this.closed = false;
}
public void init() {
HugeServerInfo.schema(this.graph).initSchemaIfNeeded();
}
public boolean close() {
this.closed = true;
if (!this.dbExecutor.isShutdown()) {
this.removeSelfServerInfo();
this.call(() -> {
try {
this.tx().close();
} catch (ConnectionException ignored) {
// ConnectionException means no connection established
}
this.graph.closeTx();
return null;
});
}
return true;
}
public synchronized void forceInitServerInfo(Id server, NodeRole role) {
if (this.closed) {
return;
}
E.checkArgument(server != null && role != null,
"The server id or role can't be null");
this.selfServerId = server;
this.selfServerRole = role;
this.saveServerInfo(this.selfServerId, this.selfServerRole);
}
public synchronized void initServerInfo(Id server, NodeRole role) {
E.checkArgument(server != null && role != null,
"The server id or role can't be null");
this.selfServerId = server;
this.selfServerRole = role;
HugeServerInfo existed = this.serverInfo(server);
E.checkArgument(existed == null || !existed.alive(),
"The server with name '%s' already in cluster",
server);
if (role.master()) {
String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
do {
Iterator<HugeServerInfo> servers = this.serverInfos(PAGE_SIZE,
page);
while (servers.hasNext()) {
existed = servers.next();
E.checkArgument(!existed.role().master() ||
!existed.alive(),
"Already existed master '%s' in current " +
"cluster", existed.id());
}
if (page != null) {
page = PageInfo.pageInfo(servers);
}
} while (page != null);
}
// TODO: save ServerInfo at AuthServer
this.saveServerInfo(this.selfServerId, this.selfServerRole);
}
public Id selfServerId() {
return this.selfServerId;
}
public NodeRole selfServerRole() {
return this.selfServerRole;
}
public boolean master() {
return this.selfServerRole != null && this.selfServerRole.master();
}
public boolean onlySingleNode() {
// Only has one master node
return this.onlySingleNode;
}
public void heartbeat() {
HugeServerInfo serverInfo = this.selfServerInfo();
if (serverInfo == null && this.selfServerId != null &&
this.selfServerRole != NodeRole.MASTER) {
serverInfo = this.saveServerInfo(this.selfServerId,
this.selfServerRole);
}
serverInfo.updateTime(DateUtil.now());
this.save(serverInfo);
}
public synchronized void decreaseLoad(int load) {
assert load > 0 : load;
HugeServerInfo serverInfo = this.selfServerInfo();
serverInfo.increaseLoad(-load);
this.save(serverInfo);
}
public int calcMaxLoad() {
// TODO: calc max load based on CPU and Memory resources
return 10000;
}
protected boolean graphReady() {
return !this.closed && this.graph.started() && this.graph.initialized();
}
protected synchronized HugeServerInfo pickWorkerNode(Collection<HugeServerInfo> servers,
HugeTask<?> task) {
HugeServerInfo master = null;
HugeServerInfo serverWithMinLoad = null;
int minLoad = Integer.MAX_VALUE;
boolean hasWorkerNode = false;
long now = DateUtil.now().getTime();
// Iterate servers to find suitable one
for (HugeServerInfo server : servers) {
if (!server.alive()) {
continue;
}
if (server.role().master()) {
master = server;
continue;
}
hasWorkerNode = true;
if (!server.suitableFor(task, now)) {
continue;
}
if (server.load() < minLoad) {
minLoad = server.load();
serverWithMinLoad = server;
}
}
boolean singleNode = !hasWorkerNode;
if (singleNode != this.onlySingleNode) {
LOG.info("Switch only_single_node to {}", singleNode);
this.onlySingleNode = singleNode;
}
// Only schedule to master if there is no workers and master is suitable
if (!hasWorkerNode) {
if (master != null && master.suitableFor(task, now)) {
serverWithMinLoad = master;
}
}
return serverWithMinLoad;
}
private GraphTransaction tx() {
assert Thread.currentThread().getName().contains("server-info-db-worker");
return this.graph.systemTransaction();
}
private HugeServerInfo saveServerInfo(Id server, NodeRole role) {
HugeServerInfo serverInfo = new HugeServerInfo(server, role);
serverInfo.maxLoad(this.calcMaxLoad());
this.save(serverInfo);
LOG.info("Init server info: {}", serverInfo);
return serverInfo;
}
private Id save(HugeServerInfo serverInfo) {
return this.call(() -> {
// Construct vertex from server info
HugeServerInfo.Schema schema = HugeServerInfo.schema(this.graph);
if (!schema.existVertexLabel(HugeServerInfo.P.SERVER)) {
throw new HugeException("Schema is missing for %s '%s'",
HugeServerInfo.P.SERVER, serverInfo);
}
HugeVertex vertex = this.tx().constructVertex(false,
serverInfo.asArray());
// Add or update server info in backend store
vertex = this.tx().addVertex(vertex);
return vertex.id();
});
}
private int save(Collection<HugeServerInfo> serverInfos) {
return this.call(() -> {
if (serverInfos.isEmpty()) {
return 0;
}
HugeServerInfo.Schema schema = HugeServerInfo.schema(this.graph);
if (!schema.existVertexLabel(HugeServerInfo.P.SERVER)) {
throw new HugeException("Schema is missing for %s",
HugeServerInfo.P.SERVER);
}
// Save server info in batch
GraphTransaction tx = this.tx();
int updated = 0;
for (HugeServerInfo server : serverInfos) {
if (!server.updated()) {
continue;
}
HugeVertex vertex = tx.constructVertex(false, server.asArray());
tx.addVertex(vertex);
updated++;
}
// NOTE: actually it is auto-commit, to be improved
tx.commitOrRollback();
return updated;
});
}
private <V> V call(Callable<V> callable) {
assert !Thread.currentThread().getName().startsWith(
"server-info-db-worker") : "can't call by itself";
try {
// Pass context for db thread
callable = new TaskManager.ContextCallable<>(callable);
// Ensure all db operations are executed in dbExecutor thread(s)
return this.dbExecutor.submit(callable).get();
} catch (Throwable e) {
throw new HugeException("Failed to update/query server info: %s",
e, e.toString());
}
}
private HugeServerInfo selfServerInfo() {
HugeServerInfo selfServerInfo = this.serverInfo(this.selfServerId);
if (selfServerInfo == null) {
LOG.warn("ServerInfo is missing: {}", this.selfServerId);
}
return selfServerInfo;
}
private HugeServerInfo serverInfo(Id server) {
return this.call(() -> {
Iterator<Vertex> vertices = this.tx().queryVertices(server);
Vertex vertex = QueryResults.one(vertices);
if (vertex == null) {
return null;
}
return HugeServerInfo.fromVertex(vertex);
});
}
private HugeServerInfo removeSelfServerInfo() {
/*
* Check this.selfServerId != null to avoid graph.initialized() call.
* NOTE: graph.initialized() may throw exception if we can't connect to
* backend store, initServerInfo() is not called in this case, so
* this.selfServerId is null at this time.
*/
if (this.selfServerId != null && this.graph.initialized()) {
return this.removeServerInfo(this.selfServerId);
}
return null;
}
private HugeServerInfo removeServerInfo(Id server) {
if (server == null) {
return null;
}
LOG.info("Remove server info: {}", server);
return this.call(() -> {
Iterator<Vertex> vertices = this.tx().queryVertices(server);
Vertex vertex = QueryResults.one(vertices);
if (vertex == null) {
return null;
}
this.tx().removeVertex((HugeVertex) vertex);
return HugeServerInfo.fromVertex(vertex);
});
}
protected void updateServerInfos(Collection<HugeServerInfo> serverInfos) {
this.save(serverInfos);
}
protected Collection<HugeServerInfo> allServerInfos() {
Iterator<HugeServerInfo> infos = this.serverInfos(NO_LIMIT, null);
try (ListIterator<HugeServerInfo> iter = new ListIterator<>(
MAX_SERVERS, infos)) {
return iter.list();
} catch (Exception e) {
throw new HugeException("Failed to close server info iterator", e);
}
}
protected Iterator<HugeServerInfo> serverInfos(String page) {
return this.serverInfos(ImmutableMap.of(), PAGE_SIZE, page);
}
protected Iterator<HugeServerInfo> serverInfos(long limit, String page) {
return this.serverInfos(ImmutableMap.of(), limit, page);
}
private Iterator<HugeServerInfo> serverInfos(Map<String, Object> conditions,
long limit, String page) {
return this.call(() -> {
ConditionQuery query = new ConditionQuery(HugeType.VERTEX);
if (page != null) {
query.page(page);
}
HugeGraph graph = this.graph.graph();
VertexLabel vl = graph.vertexLabel(HugeServerInfo.P.SERVER);
query.eq(HugeKeys.LABEL, vl.id());
for (Map.Entry<String, Object> entry : conditions.entrySet()) {
PropertyKey pk = graph.propertyKey(entry.getKey());
query.query(Condition.eq(pk.id(), entry.getValue()));
}
query.showHidden(true);
if (limit != NO_LIMIT) {
query.limit(limit);
}
Iterator<Vertex> vertices = this.tx().queryVertices(query);
Iterator<HugeServerInfo> servers =
new MapperIterator<>(vertices, HugeServerInfo::fromVertex);
// Convert iterator to list to avoid across thread tx accessed
return QueryResults.toList(servers);
});
}
private boolean supportsPaging() {
return this.graph.graph().backendStoreFeatures().supportsQueryByPage();
}
}