blob: 6e9292128494e51e33bd59525721df863a402aaf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hugegraph.job.algorithm.comm;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.job.UserJob;
import org.apache.hugegraph.schema.SchemaManager;
import org.apache.hugegraph.schema.VertexLabel;
import org.apache.hugegraph.type.define.Directions;
import org.apache.hugegraph.util.E;
import org.apache.tinkerpop.gremlin.process.traversal.Scope;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import com.google.common.collect.ImmutableMap;
public class LpaAlgorithm extends AbstractCommAlgorithm {
public static final String ALGO_NAME = "lpa";
@Override
public String name() {
return ALGO_NAME;
}
@Override
public void checkParameters(Map<String, Object> parameters) {
times(parameters);
precision(parameters);
sourceLabel(parameters);
edgeLabel(parameters);
direction(parameters);
degree(parameters);
showCommunity(parameters);
workers(parameters);
}
@Override
public Object call(UserJob<Object> job, Map<String, Object> parameters) {
int workers = workers(parameters);
String showComm = showCommunity(parameters);
try (Traverser traverser = new Traverser(job, workers)) {
if (showComm != null) {
return traverser.showCommunity(showComm);
} else {
return traverser.lpa(sourceLabel(parameters),
edgeLabel(parameters),
direction(parameters),
degree(parameters),
times(parameters),
precision(parameters));
}
} catch (Throwable e) {
job.graph().tx().rollback();
throw e;
}
}
private static class Traverser extends AlgoTraverser {
private static final long LIMIT = MAX_QUERY_LIMIT;
private final Random R = new Random();
public Traverser(UserJob<Object> job, int workers) {
super(job, ALGO_NAME, workers);
}
public Object lpa(String sourceLabel, String edgeLabel,
Directions dir, long degree,
int maxTimes, double precision) {
assert maxTimes > 0;
assert precision > 0d;
this.initSchema();
int times = maxTimes;
double changedPercent = 0d;
/*
* Iterate until:
* 1.it has stabilized
* 2.or the maximum number of times is reached
*/
for (int i = 0; i < maxTimes; i++) {
changedPercent = this.detectCommunities(sourceLabel, edgeLabel,
dir, degree);
if (changedPercent <= precision) {
times = i + 1;
break;
}
}
Number communities = tryNext(this.graph().traversal().V()
.filter(__.properties(C_LABEL))
.groupCount().by(C_LABEL)
.count(Scope.local));
return ImmutableMap.of("iteration_times", times,
"last_precision", changedPercent,
"times", maxTimes,
"communities", communities);
}
public Object showCommunity(String clabel) {
E.checkNotNull(clabel, "clabel");
// all vertices with specified c-label
Iterator<Vertex> vertices = this.vertices(null, clabel, LIMIT);
JsonMap json = new JsonMap();
json.startList();
while (vertices.hasNext()) {
this.updateProgress(++this.progress);
json.append(vertices.next().id().toString());
}
json.endList();
return json.asJson();
}
private double detectCommunities(String sourceLabel, String edgeLabel,
Directions dir, long degree) {
// shuffle: r.order().by(shuffle)
// r = this.graph().traversal().V().sample((int) LIMIT);
// detect all vertices
AtomicLong changed = new AtomicLong(0L);
long total = this.traverse(sourceLabel, null, v -> {
// called by multi-threads
if (this.voteCommunityAndUpdate(v, edgeLabel, dir, degree)) {
changed.incrementAndGet();
}
}, () -> {
// commit when finished
this.graph().tx().commit();
});
return total == 0L ? 0d : changed.doubleValue() / total;
}
private boolean voteCommunityAndUpdate(Vertex vertex, String edgeLabel,
Directions dir, long degree) {
String label = this.voteCommunityOfVertex(vertex, edgeLabel,
dir, degree);
// update label if it's absent or changed
if (!labelPresent(vertex) || !label.equals(labelOfVertex(vertex))) {
this.updateLabelOfVertex(vertex, label);
return true;
}
return false;
}
private String voteCommunityOfVertex(Vertex vertex, String edgeLabel,
Directions dir, long degree) {
// neighbors of source vertex v
Id source = (Id) vertex.id();
Id labelId = this.getEdgeLabelId(edgeLabel);
Iterator<Id> neighbors = this.adjacentVertices(source, dir,
labelId, degree);
// whether include vertex itself, greatly affects the result.
// get a larger number of small communities if include itself
//neighbors.inject(v);
// calculate label frequency
Map<String, MutableInt> labels = new HashMap<>();
while (neighbors.hasNext()) {
String label = this.labelOfVertex(neighbors.next());
if (label == null) {
// ignore invalid or not-exist vertex
continue;
}
MutableInt labelCount = labels.get(label);
if (labelCount != null) {
labelCount.increment();
} else {
labels.put(label, new MutableInt(1));
}
}
// isolated vertex
if (labels.size() == 0) {
return this.labelOfVertex(vertex);
}
// get the labels with maximum frequency
List<String> maxLabels = new ArrayList<>();
int maxFreq = 1;
for (Map.Entry<String, MutableInt> e : labels.entrySet()) {
int value = e.getValue().intValue();
if (value > maxFreq) {
maxFreq = value;
maxLabels.clear();
}
if (value == maxFreq) {
maxLabels.add(e.getKey());
}
}
/*
* TODO:
* keep origin label with probability to prevent monster communities
*/
// random choice
int selected = this.R.nextInt(maxLabels.size());
return maxLabels.get(selected);
}
private boolean labelPresent(Vertex vertex) {
return vertex.property(C_LABEL).isPresent();
}
private String labelOfVertex(Vertex vertex) {
if (!labelPresent(vertex)) {
return vertex.id().toString();
}
return vertex.value(C_LABEL);
}
private String labelOfVertex(Id vid) {
// TODO: cache with Map<Id, String>
Vertex vertex = this.vertex(vid);
if (vertex == null) {
return null;
}
return this.labelOfVertex(vertex);
}
private void updateLabelOfVertex(Vertex v, String label) {
// TODO: cache with Map<Id, String>
v.property(C_LABEL, label);
this.commitIfNeeded();
}
private void initSchema() {
String cl = C_LABEL;
SchemaManager schema = this.graph().schema();
schema.propertyKey(cl).asText().ifNotExist().create();
for (VertexLabel vl : schema.getVertexLabels()) {
schema.vertexLabel(vl.name())
.properties(cl).nullableKeys(cl)
.append();
}
}
}
}