blob: fd17a996cfee2d1e1423f7bc0c44e08fca5f55d5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.job.algorithm;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.query.ConditionQuery;
import org.apache.hugegraph.backend.query.Query;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Property;
import org.apache.tinkerpop.gremlin.structure.Transaction;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator;
import org.apache.hugegraph.HugeException;
import org.apache.hugegraph.iterator.FilterIterator;
import org.apache.hugegraph.iterator.FlatMapperIterator;
import org.apache.hugegraph.job.UserJob;
import org.apache.hugegraph.job.algorithm.Consumers.StopExecution;
import org.apache.hugegraph.testutil.Whitebox;
import org.apache.hugegraph.traversal.algorithm.HugeTraverser;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.Directions;
import org.apache.hugegraph.type.define.HugeKeys;
import org.apache.hugegraph.util.Bytes;
import org.apache.hugegraph.util.CollectionUtil;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.JsonUtil;
import org.apache.hugegraph.util.ParameterUtil;
import com.google.common.base.Objects;
@SuppressWarnings("deprecation") // StringEscapeUtils
public abstract class AbstractAlgorithm implements Algorithm {
public static final long MAX_RESULT_SIZE = 100L * Bytes.MB;
public static final long MAX_QUERY_LIMIT = 100000000L; // about 100GB
public static final long MAX_CAPACITY = MAX_QUERY_LIMIT;
public static final int BATCH = 500;
public static final String USER_DIR = System.getProperty("user.dir");
public static final String EXPORT_PATH = USER_DIR + "/export";
public static final String CATEGORY_AGGR = "aggregate";
public static final String CATEGORY_PATH = "path";
public static final String CATEGORY_RANK = "rank";
public static final String CATEGORY_SIMI = "similarity";
public static final String CATEGORY_COMM = "community";
public static final String CATEGORY_CENT = "centrality";
public static final String KEY_DIRECTION = "direction";
public static final String KEY_LABEL = "label";
public static final String KEY_DEPTH = "depth";
public static final String KEY_DEGREE = "degree";
public static final String KEY_SAMPLE = "sample";
public static final String KEY_SOURCE_SAMPLE = "source_sample";
public static final String KEY_SOURCE_LABEL = "source_label";
public static final String KEY_SOURCE_CLABEL = "source_clabel";
public static final String KEY_TOP = "top";
public static final String KEY_TIMES = "times";
public static final String KEY_STABLE_TIMES = "stable_times";
public static final String KEY_PRECISION = "precision";
public static final String KEY_SHOW_MOD= "show_modularity";
public static final String KEY_SHOW_COMM = "show_community";
public static final String KEY_EXPORT_COMM = "export_community";
public static final String KEY_SKIP_ISOLATED = "skip_isolated";
public static final String KEY_CLEAR = "clear";
public static final String KEY_CAPACITY = "capacity";
public static final String KEY_LIMIT = "limit";
public static final String KEY_EACH_LIMIT = "each_limit";
public static final String KEY_ALPHA = "alpha";
public static final String KEY_WORKERS = "workers";
public static final long DEFAULT_CAPACITY = 10000000L;
public static final long DEFAULT_LIMIT = 100L;
public static final long DEFAULT_EACH_LIMIT = 1L;
public static final long DEFAULT_DEGREE = 100L;
public static final long DEFAULT_SAMPLE = 1L;
public static final long DEFAULT_TIMES = 20L;
public static final long DEFAULT_STABLE_TIMES= 3L;
public static final double DEFAULT_PRECISION = 1.0 / 1000;
public static final double DEFAULT_ALPHA = 0.5D;
public static final String C_LABEL = "c_label";
public static final String R_RANK = "r_rank";
@Override
public void checkParameters(Map<String, Object> parameters) {
E.checkArgument(parameters.isEmpty(),
"Unnecessary parameters: %s", parameters);
}
protected static int depth(Map<String, Object> parameters) {
int depth = ParameterUtil.parameterInt(parameters, KEY_DEPTH);
E.checkArgument(depth > 0,
"The value of %s must be > 0, but got %s",
KEY_DEPTH, depth);
return depth;
}
protected static String edgeLabel(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_LABEL)) {
return null;
}
return ParameterUtil.parameterString(parameters, KEY_LABEL);
}
protected static Directions direction(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_DIRECTION)) {
return Directions.BOTH;
}
Object direction = ParameterUtil.parameter(parameters, KEY_DIRECTION);
return parseDirection(direction);
}
protected static Directions direction4Out(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_DIRECTION)) {
return Directions.OUT;
}
Object direction = ParameterUtil.parameter(parameters, KEY_DIRECTION);
return parseDirection(direction);
}
protected static Directions directionOutIn(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_DIRECTION)) {
return Directions.OUT;
}
Object direction = ParameterUtil.parameter(parameters, KEY_DIRECTION);
Directions dir = parseDirection(direction);
E.checkArgument(dir == Directions.OUT || dir == Directions.IN,
"The value of %s must be either OUT or IN, but got: %s",
KEY_DIRECTION, dir);
return dir;
}
protected static double alpha(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_ALPHA)) {
return DEFAULT_ALPHA;
}
double alpha = ParameterUtil.parameterDouble(parameters, KEY_ALPHA);
E.checkArgument(alpha > 0.0 && alpha <= 1.0,
"The value of %s must be in range (0, 1], but got %s",
KEY_ALPHA, alpha);
return alpha;
}
protected static long top(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_TOP)) {
return 0L;
}
long top = ParameterUtil.parameterLong(parameters, KEY_TOP);
HugeTraverser.checkNonNegativeOrNoLimit(top, KEY_TOP);
return top;
}
protected static long degree(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_DEGREE)) {
return DEFAULT_DEGREE;
}
long degree = ParameterUtil.parameterLong(parameters, KEY_DEGREE);
HugeTraverser.checkDegree(degree);
return degree;
}
protected static long capacity(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_CAPACITY)) {
return DEFAULT_CAPACITY;
}
long capacity = ParameterUtil.parameterLong(parameters, KEY_CAPACITY);
HugeTraverser.checkCapacity(capacity);
return capacity;
}
protected static long limit(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_LIMIT)) {
return DEFAULT_LIMIT;
}
long limit = ParameterUtil.parameterLong(parameters, KEY_LIMIT);
HugeTraverser.checkLimit(limit);
return limit;
}
protected static long eachLimit(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_EACH_LIMIT)) {
return DEFAULT_EACH_LIMIT;
}
long limit = ParameterUtil.parameterLong(parameters, KEY_EACH_LIMIT);
HugeTraverser.checkPositiveOrNoLimit(limit, KEY_EACH_LIMIT);
return limit;
}
protected static long sample(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_SAMPLE)) {
return DEFAULT_SAMPLE;
}
long sample = ParameterUtil.parameterLong(parameters, KEY_SAMPLE);
HugeTraverser.checkPositiveOrNoLimit(sample, KEY_SAMPLE);
return sample;
}
protected static long sourceSample(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_SOURCE_SAMPLE)) {
return HugeTraverser.NO_LIMIT;
}
long sample = ParameterUtil.parameterLong(parameters,
KEY_SOURCE_SAMPLE);
HugeTraverser.checkPositiveOrNoLimit(sample, KEY_SOURCE_SAMPLE);
return sample;
}
protected static String sourceLabel(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_SOURCE_LABEL)) {
return null;
}
return ParameterUtil.parameterString(parameters, KEY_SOURCE_LABEL);
}
protected static String sourceCLabel(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_SOURCE_CLABEL)) {
return null;
}
return ParameterUtil.parameterString(parameters, KEY_SOURCE_CLABEL);
}
protected static int workers(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_WORKERS)) {
return -1;
}
int workers = ParameterUtil.parameterInt(parameters, KEY_WORKERS);
HugeTraverser.checkNonNegativeOrNoLimit(workers, KEY_WORKERS);
return workers;
}
protected static Directions parseDirection(Object direction) {
if (direction.equals(Directions.BOTH.toString())) {
return Directions.BOTH;
} else if (direction.equals(Directions.OUT.toString())) {
return Directions.OUT;
} else if (direction.equals(Directions.IN.toString())) {
return Directions.IN;
} else {
throw new IllegalArgumentException(String.format(
"The value of direction must be in [OUT, IN, BOTH], " +
"but got '%s'", direction));
}
}
public static class AlgoTraverser extends HugeTraverser
implements AutoCloseable {
private final UserJob<Object> job;
protected final ExecutorService executor;
protected long progress;
public AlgoTraverser(UserJob<Object> job) {
super(job.graph());
this.job = job;
this.executor = null;
}
protected AlgoTraverser(UserJob<Object> job, String name, int workers) {
super(job.graph());
this.job = job;
String prefix = name + "-" + this.jobId();
this.executor = Consumers.newThreadPool(prefix, workers);
}
public void updateProgress(long progress) {
this.job.updateProgress((int) progress);
}
public Id jobId() {
return this.job.task().id();
}
@Override
public void close() {
if (this.executor != null) {
this.executor.shutdown();
}
}
protected long traverse(String sourceLabel, String sourceCLabel,
Consumer<Vertex> consumer) {
return this.traverse(sourceLabel, sourceCLabel, consumer, null);
}
protected long traverse(String sourceLabel, String sourceCLabel,
Consumer<Vertex> consumer, Runnable done) {
return this.traverse(sourceLabel, sourceCLabel, consumer, done,
NO_LIMIT);
}
protected long traverse(String sourceLabel, String sourceCLabel,
Consumer<Vertex> consumer, Runnable done,
long limit) {
long actualLimit = limit == NO_LIMIT ? Query.NO_LIMIT : limit;
Iterator<Vertex> vertices = this.vertices(sourceLabel, sourceCLabel,
actualLimit);
Consumers<Vertex> consumers = new Consumers<>(this.executor,
consumer, done);
consumers.start("task-" + this.job.task().id());
long total = 0L;
try {
while (vertices.hasNext()) {
this.updateProgress(++this.progress);
total++;
Vertex v = vertices.next();
consumers.provide(v);
}
} catch (StopExecution e) {
// pass
} catch (Throwable e) {
throw Consumers.wrapException(e);
} finally {
consumers.await();
CloseableIterator.closeIterator(vertices);
}
return total;
}
protected Iterator<Vertex> vertices() {
return this.vertices(Query.NO_LIMIT);
}
protected Iterator<Vertex> vertices(long limit) {
Query query = new Query(HugeType.VERTEX);
query.capacity(Query.NO_CAPACITY);
query.limit(limit);
return this.graph().vertices(query);
}
protected Iterator<Vertex> vertices(Object label, Object clabel,
long limit) {
return vertices(label, C_LABEL, clabel, limit);
}
protected Iterator<Vertex> vertices(Object label, String key,
Object value, long limit) {
Iterator<Vertex> vertices = this.vertices(label, limit);
if (value != null) {
vertices = filter(vertices, key, value);
}
return vertices;
}
protected Iterator<Vertex> vertices(Object label, long limit) {
if (label == null) {
return this.vertices(limit);
}
ConditionQuery query = new ConditionQuery(HugeType.VERTEX);
query.capacity(Query.NO_CAPACITY);
query.limit(limit);
query.eq(HugeKeys.LABEL, this.getVertexLabelId(label));
return this.graph().vertices(query);
}
protected Iterator<Vertex> vertices(Iterator<Object> ids) {
return new FlatMapperIterator<>(ids, id -> {
return this.graph().vertices(id);
});
}
protected Vertex vertex(Object id) {
Iterator<Vertex> iter = this.graph().vertices(id);
if (!iter.hasNext()) {
return null;
}
return iter.next();
}
protected Iterator<Vertex> filter(Iterator<Vertex> vertices,
String key, Object value) {
return new FilterIterator<>(vertices, vertex -> {
boolean matched = match(vertex, key, value);
if (!matched) {
this.updateProgress(++this.progress);
}
return matched;
});
}
protected boolean match(Element elem, Object clabel) {
return match(elem, C_LABEL, clabel);
}
protected boolean match(Element elem, String key, Object value) {
// check property key exists
this.graph().propertyKey(key);
// return true if property value exists & equals to specified value
Property<Object> p = elem.property(key);
return p.isPresent() && Objects.equal(p.value(), value);
}
protected Iterator<Edge> edges(Directions dir) {
HugeType type = dir == null ? HugeType.EDGE : dir.type();
Query query = new Query(type);
query.capacity(Query.NO_CAPACITY);
query.limit(Query.NO_LIMIT);
return this.graph().edges(query);
}
protected void drop(GraphTraversal<?, ? extends Element> traversal) {
this.execute(traversal, () -> {
while (traversal.hasNext()) {
this.updateProgress(++this.progress);
traversal.next().remove();
this.commitIfNeeded();
}
return null;
});
this.graph().tx().commit();
}
protected <V> V execute(GraphTraversal<?, ?> traversal,
Callable<V> callback) {
long capacity = Query.defaultCapacity(MAX_QUERY_LIMIT);
try {
return callback.call();
} catch (Exception e) {
throw new HugeException("Failed to execute algorithm: %s",
e, e.getMessage());
} finally {
Query.defaultCapacity(capacity);
try {
traversal.close();
} catch (Exception e) {
throw new HugeException("Can't close traversal", e);
}
}
}
protected <V extends Number> Number tryNext(GraphTraversal<?, V> iter) {
return this.execute(iter, () -> {
try {
return iter.next();
} catch (NoSuchElementException e) {
return 0;
}
});
}
protected void commitIfNeeded() {
// commit if needed
Transaction tx = this.graph().tx();
Whitebox.invoke(tx.getClass(), "commitIfGtSize", tx, BATCH);
}
}
public static final class TopMap<K> {
private final long topN;
private Map<K, MutableLong> tops;
public TopMap(long topN) {
this.topN = topN;
this.tops = new HashMap<>();
}
public int size() {
return this.tops.size();
}
public MutableLong get(K key) {
return this.tops.get(key);
}
public void add(K key, long value) {
MutableLong mlong = this.tops.get(key);
if (mlong == null) {
mlong = new MutableLong(value);
this.tops.put(key, mlong);
}
mlong.add(value);
}
public void put(K key, long value) {
assert this.topN != 0L;
this.tops.put(key, new MutableLong(value));
// keep 2x buffer
if (this.tops.size() > this.topN * 2 &&
this.topN != HugeTraverser.NO_LIMIT) {
this.shrinkIfNeeded(this.topN);
}
}
public Set<Map.Entry<K, MutableLong>> entrySet() {
if (this.tops.size() <= this.topN) {
this.tops = CollectionUtil.sortByValue(this.tops, false);
} else {
this.shrinkIfNeeded(this.topN);
}
return this.tops.entrySet();
}
private void shrinkIfNeeded(long limit) {
assert limit != 0L;
if (this.tops.size() >= limit &&
(limit > 0L || limit == HugeTraverser.NO_LIMIT)) {
// Just do sort if limit=NO_LIMIT, else do sort and shrink
this.tops = HugeTraverser.topN(this.tops, true, limit);
}
}
}
public static final class JsonMap {
private final StringBuilder json;
public JsonMap() {
this(4 * (int) Bytes.KB);
}
public JsonMap(int initCapacity) {
this.json = new StringBuilder(initCapacity);
}
public void startObject() {
this.json.append('{');
}
public void endObject() {
this.deleteLastComma();
this.json.append('}');
}
public void startList() {
this.json.append('[');
}
public void endList() {
this.deleteLastComma();
this.json.append(']');
}
public void deleteLastComma() {
int last = this.json.length() - 1;
if (last >= 0 && this.json.charAt(last) == ',') {
this.json.deleteCharAt(last);
}
}
public void appendKey(String key) {
this.appendString(key).append(':');
}
public void append(long value) {
this.json.append(value).append(',');
this.checkSizeLimit();
}
public void append(String value) {
this.appendString(value).append(',');
this.checkSizeLimit();
}
public void append(Object key, long value) {
this.append(key.toString(), value);
}
public void append(String key, long value) {
this.appendString(key).append(':');
this.json.append(value).append(',');
this.checkSizeLimit();
}
public void append(Object key, Number value) {
this.append(key.toString(), value);
}
public void append(String key, Number value) {
this.appendString(key).append(':');
this.json.append(value).append(',');
this.checkSizeLimit();
}
public void append(String key, String value) {
this.appendString(key).append(':');
this.appendString(value).append(',');
this.checkSizeLimit();
}
public void appendRaw(String key, String rawJson) {
this.appendString(key).append(':');
this.json.append(rawJson).append(',');
this.checkSizeLimit();
}
public void appendRaw(String rawJson) {
this.json.append(rawJson).append(',');
this.checkSizeLimit();
}
public void append(Set<Entry<Id, MutableLong>> kvs) {
for (Map.Entry<Id, MutableLong> top : kvs) {
this.append(top.getKey(), top.getValue());
}
}
private StringBuilder appendString(String str) {
if (str.indexOf('"') >= 0) {
str = StringEscapeUtils.escapeJson(str);
}
return this.json.append('"').append(str).append('"');
}
public void checkSizeLimit() {
E.checkArgument(this.json.length() < MAX_RESULT_SIZE,
"The result size exceeds limit %s",
MAX_RESULT_SIZE);
}
public Object asJson() {
return JsonUtil.asJson(this.json.toString());
}
}
}