blob: dfd72a763f5b4e835645685200988b785d66a706 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.example;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.function.Consumer;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Transaction;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.slf4j.Logger;
import org.apache.hugegraph.HugeGraph;
import org.apache.hugegraph.backend.cache.Cache;
import org.apache.hugegraph.backend.cache.CacheManager;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.query.ConditionQuery;
import org.apache.hugegraph.perf.PerfUtil;
import org.apache.hugegraph.schema.SchemaManager;
import org.apache.hugegraph.structure.HugeVertex;
import org.apache.hugegraph.testutil.Whitebox;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.Directions;
import org.apache.hugegraph.type.define.HugeKeys;
import org.apache.hugegraph.util.Log;
public abstract class PerfExampleBase {
public static final int PERSON_NUM = 70;
public static final int SOFTWARE_NUM = 30;
public static final int EDGE_NUM = 100;
protected static final Logger LOG = Log.logger(PerfExampleBase.class);
protected Set<Object> vertices = Collections.newSetFromMap(
new ConcurrentHashMap<Object, Boolean>());
protected boolean profile = false;
public int test(String[] args) throws Exception {
if (args.length != 4) {
LOG.info("Usage: threadCount times multiple profile");
return -1;
}
int threadCount = Integer.parseInt(args[0]);
int times = Integer.parseInt(args[1]);
int multiple = Integer.parseInt(args[2]);
this.profile = Boolean.parseBoolean(args[3]);
// NOTE: this test with HugeGraph is for local, change it into
// client if test with restful server from remote
HugeGraph hugegraph = ExampleUtil.loadGraph(true, this.profile);
GraphManager graph = new GraphManager(hugegraph);
initSchema(hugegraph.schema());
testInsertPerf(graph, threadCount, times, multiple);
testQueryVertexPerf(graph, threadCount, times, multiple);
testQueryEdgePerf(graph, threadCount, times, multiple);
hugegraph.close();
return 0;
}
/**
* Multi-threaded and multi-commits and batch insertion test
* @param graph graph
* @param threadCount
* The count of threads that perform the insert operation at the
* same time
* @param times
* The transaction commit times for each thread
* @param multiple
* The coefficient to multiple number of vertices(100) and edges(100)
* for each transaction commit
* @throws Exception execute may throw Exception
*/
public void testInsertPerf(GraphManager graph,
int threadCount,
int times,
int multiple)
throws Exception {
// Total vertices/edges
long n = threadCount * times * multiple;
long vertices = (PERSON_NUM + SOFTWARE_NUM) * n;
long edges = EDGE_NUM * n;
long cost = this.execute(graph, i -> {
this.testInsert(graph, times, multiple);
}, threadCount);
LOG.info("Insert rate with threads: {} vertices/s & {} edges/s, " +
"insert total {} vertices & {} edges, cost time: {}ms",
vertices * 1000 / cost, edges * 1000 / cost,
vertices, edges, cost);
graph.clearVertexCache();
}
public void testQueryVertexPerf(GraphManager graph,
int threadCount,
int times,
int multiple)
throws Exception {
long cost = this.execute(graph, i -> {
this.testQueryVertex(graph, threadCount, i, multiple);
}, threadCount);
final long size = (PERSON_NUM + SOFTWARE_NUM) * threadCount * times;
LOG.info("Query rate with threads: {} vertices/s, " +
"query total vertices {}, cost time: {}ms",
size * 1000 / cost, size, cost);
}
public void testQueryEdgePerf(GraphManager graph,
int threadCount,
int times,
int multiple)
throws Exception {
long cost = this.execute(graph, i -> {
this.testQueryEdge(graph, threadCount, i, multiple);
}, threadCount);
final long size = (PERSON_NUM + SOFTWARE_NUM) * threadCount * times;
LOG.info("Query rate with threads: {} vedges/s, " +
"query total vedges {}, cost time: {}ms",
size * 1000 / cost, size, cost);
}
protected long execute(GraphManager graph, Consumer<Integer> task,
int threadCount) throws Exception {
CyclicBarrier startBarrier = new CyclicBarrier(threadCount + 1);
CyclicBarrier endBarrier = new CyclicBarrier(threadCount + 1);
List<Thread> threads = new ArrayList<>(threadCount);
for (int i = 0; i < threadCount; i++) {
int j = i;
Thread t = new Thread(() -> {
graph.initEnv();
try {
startBarrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
task.accept(j);
try {
endBarrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
if (this.profile) {
LOG.info("option = {}", PerfUtil.instance().toECharts());
}
graph.destroyEnv();
});
threads.add(t);
}
for (Thread t : threads) {
t.start();
}
startBarrier.await();
long beginTime = System.currentTimeMillis();
endBarrier.await();
long endTime = System.currentTimeMillis();
for (Thread t : threads) {
t.join();
}
return endTime - beginTime;
}
protected abstract void initSchema(SchemaManager schema);
protected abstract void testInsert(GraphManager graph,
int times,
int multiple);
protected void testQueryVertex(GraphManager graph,
int threads,
int thread,
int multiple) {
int i = 0;
int j = 0;
int total = 0;
for (Object id : this.vertices) {
if (i++ % multiple != 0) {
continue;
}
if (j++ % threads != thread) {
continue;
}
LOG.debug("Query vertex {}: {}", i, id);
Vertex vertex = graph.queryVertex(id);
if (!vertex.id().equals(id)) {
LOG.warn("Query vertex by id {} returned {}", id, vertex);
}
total++;
}
LOG.debug("Query vertices with thread({}): {}", thread, total);
}
protected void testQueryEdge(GraphManager graph,
int threads,
int thread,
int multiple) {
int i = 0;
int j = 0;
int totalV = 0;
int totalE = 0;
for (Object id : this.vertices) {
if (i++ % multiple != 0) {
continue;
}
if (j++ % threads != thread) {
continue;
}
LOG.debug("Query vertex {}: {}", i, id);
Iterator<Edge> edges = graph.queryVertexEdge(id, Directions.OUT);
while (edges.hasNext()) {
totalE++;
LOG.debug("Edge of vertex {}: {}", i, edges.next());
}
totalV++;
}
LOG.debug("Query edges of vertices({}) with thread({}): {}",
totalV, thread, totalE);
}
protected static class GraphManager {
private HugeGraph hugegraph;
private Cache<Id, Object> cache = CacheManager.instance()
.cache("perf-test");
public GraphManager(HugeGraph hugegraph) {
this.hugegraph = hugegraph;
}
public void initEnv() {
// Cost about 6s
Whitebox.invoke(this.hugegraph.getClass(),
"graphTransaction", this.hugegraph);
}
public void destroyEnv() {
Whitebox.invoke(this.hugegraph.getClass(),
"closeTx", this.hugegraph);
}
public Transaction tx() {
return this.hugegraph.tx();
}
public GraphTraversalSource traversal() {
return this.hugegraph.traversal();
}
public Vertex addVertex(Object... keyValues) {
HugeVertex v = (HugeVertex) this.hugegraph.addVertex(keyValues);
this.cache.update(v.id(), v);
return v;
}
public Vertex getVertex(Object id) {
return (Vertex) this.cache.getOrFetch((Id) id, k -> {
return this.hugegraph.vertices(k).next();
});
}
public void clearVertexCache() {
this.cache.clear();
}
public Vertex queryVertex(Object id) {
return this.hugegraph.vertices(id).next();
}
public Iterator<Edge> queryVertexEdge(Object id, Directions direction) {
ConditionQuery q = new ConditionQuery(HugeType.EDGE);
q.eq(HugeKeys.OWNER_VERTEX, id);
q.eq(HugeKeys.DIRECTION, direction);
return this.hugegraph.edges(q);
}
}
}