blob: fbe139005a693fa611822dd030ffd1fd91b45c70 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.job.algorithm.comm;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hugegraph.backend.id.Id;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.apache.hugegraph.HugeGraph;
import org.apache.hugegraph.job.UserJob;
import org.apache.hugegraph.traversal.algorithm.FusiformSimilarityTraverser;
import org.apache.hugegraph.type.define.Directions;
import org.apache.hugegraph.util.CollectionUtil;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.JsonUtil;
import org.apache.hugegraph.util.ParameterUtil;
import com.google.common.collect.ImmutableSet;
public class KCoreAlgorithm extends AbstractCommAlgorithm {
public static final String ALGO_NAME = "k_core";
public static final String KEY_K = "k";
public static final String KEY_MERGED = "merged";
public static final int DEFAULT_K = 3;
@Override
public String name() {
return ALGO_NAME;
}
@Override
public void checkParameters(Map<String, Object> parameters) {
k(parameters);
alpha(parameters);
merged(parameters);
degree(parameters);
sourceLabel(parameters);
sourceCLabel(parameters);
direction(parameters);
edgeLabel(parameters);
workers(parameters);
}
@Override
public Object call(UserJob<Object> job, Map<String, Object> parameters) {
int workers = workers(parameters);
try (Traverser traverser = new Traverser(job, workers)) {
return traverser.kcore(sourceLabel(parameters),
sourceCLabel(parameters),
direction(parameters),
edgeLabel(parameters),
k(parameters),
alpha(parameters),
degree(parameters),
merged(parameters));
}
}
protected static int k(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_K)) {
return DEFAULT_K;
}
int k = ParameterUtil.parameterInt(parameters, KEY_K);
E.checkArgument(k > 1, "The k of kcore must be > 1, but got %s", k);
return k;
}
protected static boolean merged(Map<String, Object> parameters) {
if (!parameters.containsKey(KEY_MERGED)) {
return false;
}
return ParameterUtil.parameterBoolean(parameters, KEY_MERGED);
}
private static class Traverser extends AlgoTraverser {
public Traverser(UserJob<Object> job, int workers) {
super(job, ALGO_NAME, workers);
}
public Object kcore(String sourceLabel, String sourceCLabel,
Directions dir, String label, int k, double alpha,
long degree, boolean merged) {
HugeGraph graph = this.graph();
KcoreTraverser traverser = new KcoreTraverser(graph);
JsonMap kcoresJson = new JsonMap();
kcoresJson.startObject();
kcoresJson.appendKey("kcores");
kcoresJson.startList();
Set<Set<Id>> kcores = new HashSet<>();
this.traverse(sourceLabel, sourceCLabel, v -> {
Set<Id> kcore = traverser.kcore(IteratorUtils.of(v),
dir, label, k, alpha, degree);
if (kcore.isEmpty()) {
return;
}
if (merged) {
synchronized (kcores) {
mergeKcores(kcores, kcore);
}
} else {
String json = JsonUtil.toJson(kcore);
synchronized (kcoresJson) {
kcoresJson.appendRaw(json);
}
}
});
if (merged) {
for (Set<Id> kcore : kcores) {
kcoresJson.appendRaw(JsonUtil.toJson(kcore));
}
}
kcoresJson.endList();
kcoresJson.endObject();
return kcoresJson.asJson();
}
private static void mergeKcores(Set<Set<Id>> kcores, Set<Id> kcore) {
boolean merged = false;
/*
* Iterate to collect merging kcores firstly, because merging
* kcores will be removed from all kcores.
* Besides one new kcore may connect to multiple existing kcores.
*/
Set<Set<Id>> mergingKcores = new HashSet<>();
for (Set<Id> existedKcore : kcores) {
if (CollectionUtil.hasIntersection(existedKcore, kcore)) {
mergingKcores.add(existedKcore);
merged = true;
}
}
if (merged) {
for (Set<Id> mergingKcore : mergingKcores) {
kcores.remove(mergingKcore);
kcore.addAll(mergingKcore);
}
}
kcores.add(kcore);
}
}
public static class KcoreTraverser extends FusiformSimilarityTraverser {
public KcoreTraverser(HugeGraph graph) {
super(graph);
}
public Set<Id> kcore(Iterator<Vertex> vertices, Directions direction,
String label, int k, double alpha, long degree) {
int minNeighbors = (int) Math.floor(1.0 / alpha * k);
SimilarsMap map = fusiformSimilarity(vertices, direction, label,
minNeighbors, alpha, k - 1,
0, null, 0, degree,
NO_LIMIT, NO_LIMIT, true);
if (map.isEmpty()) {
return ImmutableSet.of();
}
return extractKcore(map, k);
}
@SuppressWarnings("unchecked")
private static Set<Id> extractKcore(SimilarsMap similarsMap, int k) {
assert similarsMap.size() == 1;
Map.Entry<Id, Set<Similar>> entry = similarsMap.entrySet()
.iterator().next();
Id source = entry.getKey();
Set<KcoreSimilar> similars = new HashSet<>();
for (Similar similar: entry.getValue()) {
similars.add(new KcoreSimilar(similar));
}
boolean stop;
do {
stop = true;
// Do statistics
Map<Id, MutableInt> counts = new HashMap<>();
for (KcoreSimilar similar : similars) {
for (Id id : similar.ids()) {
MutableInt count = counts.get(id);
if (count == null) {
count = new MutableInt(0);
counts.put(id, count);
}
count.increment();
}
}
/*
* Iterate similars to:
* 1. delete failed similar
* 2. delete failed intermediaries in survive similar
* 3. update statistics
*/
Set<KcoreSimilar> failedSimilars = new HashSet<>();
for (KcoreSimilar similar : similars) {
Set<Id> failedIds = new HashSet<>();
for (Id id : similar.ids()) {
MutableInt count = counts.get(id);
if (count.getValue() < k - 1) {
count.decrement();
failedIds.add(id);
stop = false;
}
}
Set<Id> survivedIds = new HashSet<>(CollectionUtils
.subtract(similar.ids(), failedIds));
if (survivedIds.size() < k) {
for (Id id : survivedIds) {
counts.get(id).decrement();
}
failedSimilars.add(similar);
} else {
similar.ids(survivedIds);
}
}
similars = new HashSet<>(CollectionUtils.subtract(
similars, failedSimilars));
} while (!stop);
if (similars.isEmpty()) {
return ImmutableSet.of();
}
Set<Id> kcores = new HashSet<>();
kcores.add(source);
for (KcoreSimilar similar : similars) {
kcores.add(similar.id());
kcores.addAll(similar.ids());
}
return kcores;
}
}
private static class KcoreSimilar extends
FusiformSimilarityTraverser.Similar {
private Set<Id> ids;
public KcoreSimilar(Id id, double score, List<Id> intermediaries) {
super(id, score, intermediaries);
this.ids = null;
}
public KcoreSimilar(FusiformSimilarityTraverser.Similar similar) {
super(similar.id(), similar.score(), similar.intermediaries());
this.ids = new HashSet<>(this.intermediaries());
}
public Set<Id> ids() {
if (this.ids == null) {
this.ids = new HashSet<>(this.intermediaries());
}
return this.ids;
}
public void ids(Set<Id> ids) {
this.ids = ids;
}
}
}