blob: 581604d8ec6db17c66226f386dbcff1b11233ab2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.job.algorithm;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.util.ParameterUtil;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.slf4j.Logger;
import org.apache.hugegraph.HugeGraph;
import org.apache.hugegraph.StandardHugeGraph;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.job.UserJob;
import org.apache.hugegraph.task.HugeTask;
import org.apache.hugegraph.traversal.algorithm.HugeTraverser;
import org.apache.hugegraph.traversal.optimize.HugeScriptTraversal;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.InsertionOrderUtil;
import org.apache.hugegraph.util.Log;
import com.google.common.collect.ImmutableMap;
public class SubgraphStatAlgorithm extends AbstractAlgorithm {
public static final String KEY_SUBGRAPH = "subgraph";
public static final String KEY_COPY_SCHEMA = "copy_schema";
private static final Logger LOG = Log.logger(SubgraphStatAlgorithm.class);
@Override
public String name() {
return "subgraph_stat";
}
@Override
public String category() {
return CATEGORY_AGGR;
}
@Override
public void checkParameters(Map<String, Object> parameters) {
subgraph(parameters);
}
@Override
public Object call(UserJob<Object> job, Map<String, Object> parameters) {
HugeGraph graph = this.createTempGraph(job);
try (Traverser traverser = new Traverser(job)) {
this.initGraph(job.graph(), graph,
subgraph(parameters), copySchema(parameters));
UserJob<Object> tmpJob = new TempJob<>(graph, job, job.task());
return traverser.subgraphStat(tmpJob);
} finally {
// Use clearBackend instead of truncateBackend due to no server-id
graph.clearBackend();
try {
graph.close();
} catch (Throwable e) {
LOG.warn("Can't close subgraph_stat temp graph {}: {}",
graph, e.getMessage(), e);
}
}
}
private HugeGraph createTempGraph(UserJob<Object> job) {
Id id = job.task().id();
String name = "tmp_" + id;
PropertiesConfiguration config = new PropertiesConfiguration();
config.setProperty(CoreOptions.BACKEND.name(), "memory");
config.setProperty(CoreOptions.STORE.name(), name);
/*
* NOTE: this temp graph don't need to init backend because no task info
* required, also not set started because no task to be scheduled.
*/
return new StandardHugeGraph(new HugeConfig(config));
}
@SuppressWarnings("resource")
private void initGraph(HugeGraph parent, HugeGraph graph,
String script, boolean copySchema) {
if (copySchema) {
graph.schema().copyFrom(parent.schema());
}
new HugeScriptTraversal<>(graph.traversal(), "gremlin-groovy",
script, ImmutableMap.of(),
ImmutableMap.of()).iterate();
graph.tx().commit();
}
protected static String subgraph(Map<String, Object> parameters) {
Object subgraph = parameters.get(KEY_SUBGRAPH);
E.checkArgument(subgraph != null,
"Must pass parameter '%s'", KEY_SUBGRAPH);
E.checkArgument(subgraph instanceof String,
"Invalid parameter '%s', expect a String, but got %s",
KEY_SUBGRAPH, subgraph.getClass().getSimpleName());
return (String) subgraph;
}
protected static boolean copySchema(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_COPY_SCHEMA)) {
return false;
}
return ParameterUtil.parameterBoolean(parameters, KEY_COPY_SCHEMA);
}
private static class Traverser extends AlgoTraverser {
private static final Map<String, Object> PARAMS = ImmutableMap.of("depth", 10L,
"degree", -1L,
"sample", -1L,
"top", -1L /* sorted */,
"workers", 0);
public Traverser(UserJob<Object> job) {
super(job);
}
public Object subgraphStat(UserJob<Object> job) {
AlgorithmPool pool = AlgorithmPool.instance();
Map<String, Object> results = InsertionOrderUtil.newMap();
GraphTraversalSource g = job.graph().traversal();
results.put("vertices_count", g.V().count().next());
results.put("edges_count", g.E().count().next());
Algorithm algo = pool.get("degree_centrality");
Map<String, Object> parameters = ImmutableMap.copyOf(PARAMS);
results.put("degrees", algo.call(job, parameters));
algo = pool.get("stress_centrality");
results.put("stress", algo.call(job, parameters));
algo = pool.get("betweenness_centrality");
results.put("betweenness", algo.call(job, parameters));
algo = pool.get("eigenvector_centrality");
results.put("eigenvectors", algo.call(job, parameters));
algo = pool.get("closeness_centrality");
results.put("closeness", algo.call(job, parameters));
results.put("page_ranks", pageRanks(job));
algo = pool.get("cluster_coefficient");
results.put("cluster_coefficient", algo.call(job, parameters));
algo = pool.get("rings");
parameters = ImmutableMap.<String, Object>builder()
.putAll(PARAMS)
.put("count_only", true)
.put("each_limit", NO_LIMIT)
.put("limit", NO_LIMIT)
.build();
results.put("rings", algo.call(job, parameters));
return results;
}
private Map<Object, Double> pageRanks(UserJob<Object> job) {
Algorithm algo = AlgorithmPool.instance().get("page_rank");
algo.call(job, ImmutableMap.of("alpha", 0.15));
// Collect page ranks
Map<Object, Double> ranks = InsertionOrderUtil.newMap();
Iterator<Vertex> vertices = job.graph().vertices();
while (vertices.hasNext()) {
Vertex vertex = vertices.next();
ranks.put(vertex.id(), vertex.value(R_RANK));
}
ranks = HugeTraverser.topN(ranks, true, NO_LIMIT);
return ranks;
}
}
private static class TempJob<V> extends UserJob<V> {
private final UserJob<V> parent;
public TempJob(HugeGraph graph, UserJob<V> job, HugeTask<V> task) {
this.graph(graph);
this.task(task);
this.parent = job;
}
@Override
public String type() {
return "temp";
}
@Override
public V execute() throws Exception {
return null;
}
@Override
public void updateProgress(int progress) {
this.parent.updateProgress(progress);
}
}
}