blob: acc8f3c15a517e46acda45cc979e9ded81101e8f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.process.computer.clustering.connected;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationUtils;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
import org.apache.tinkerpop.gremlin.process.computer.Messenger;
import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
import org.apache.tinkerpop.gremlin.process.traversal.Operator;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.IndexedTraverserSet;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.process.traversal.util.PureTraversal;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
/**
* Identifies "Connected Component" instances in a graph by assigning a component identifier (the lexicographically
* least string value of the vertex in the component) to each vertex.
*
* @author Stephen Mallette (http://stephen.genoprime.com)
* @author Daniel Kuppitz (http://gremlin.guru)
*/
public class ConnectedComponentVertexProgram implements VertexProgram<String> {
public static final String COMPONENT = "gremlin.connectedComponentVertexProgram.component";
private static final String PROPERTY = "gremlin.connectedComponentVertexProgram.property";
private static final String EDGE_TRAVERSAL = "gremlin.pageRankVertexProgram.edgeTraversal";
private static final String VOTE_TO_HALT = "gremlin.connectedComponentVertexProgram.voteToHalt";
private static final Set<MemoryComputeKey> MEMORY_COMPUTE_KEYS = Collections.singleton(MemoryComputeKey.of(VOTE_TO_HALT, Operator.and, false, true));
private MessageScope.Local<?> scope = MessageScope.Local.of(__::bothE);
private Set<MessageScope> scopes;
private String property = COMPONENT;
private PureTraversal<Vertex, Edge> edgeTraversal = null;
private Configuration configuration;
private TraverserSet<Vertex> haltedTraversers;
private IndexedTraverserSet<Vertex, Vertex> haltedTraversersIndex;
private ConnectedComponentVertexProgram() {}
@Override
public void loadState(final Graph graph, final Configuration config) {
configuration = new BaseConfiguration();
if (config != null) {
ConfigurationUtils.copy(config, configuration);
}
if (configuration.containsKey(EDGE_TRAVERSAL)) {
this.edgeTraversal = PureTraversal.loadState(configuration, EDGE_TRAVERSAL, graph);
this.scope = MessageScope.Local.of(() -> this.edgeTraversal.get().clone());
}
scopes = new HashSet<>(Collections.singletonList(scope));
this.property = configuration.getString(PROPERTY, COMPONENT);
this.haltedTraversers = TraversalVertexProgram.loadHaltedTraversers(configuration);
this.haltedTraversersIndex = new IndexedTraverserSet<>(v -> v);
for (final Traverser.Admin<Vertex> traverser : this.haltedTraversers) {
this.haltedTraversersIndex.add(traverser.split());
}
}
@Override
public void storeState(final Configuration config) {
VertexProgram.super.storeState(config);
if (configuration != null) {
ConfigurationUtils.copy(configuration, config);
}
}
@Override
public void setup(final Memory memory) {
memory.set(VOTE_TO_HALT, true);
}
@Override
public void execute(final Vertex vertex, final Messenger<String> messenger, final Memory memory) {
if (memory.isInitialIteration()) {
copyHaltedTraversersFromMemory(vertex);
// on the first pass, just initialize the component to its own id then pass it to all adjacent vertices
// for evaluation
vertex.property(VertexProperty.Cardinality.single, property, vertex.id().toString());
// vertices that have no edges remain in their own component - nothing to message pass here
if (vertex.edges(Direction.BOTH).hasNext()) {
// since there was message passing we don't want to halt on the first round. this should only trigger
// a single pass finish if the graph is completely disconnected (technically, it won't even really
// work in cases where halted traversers come into play
messenger.sendMessage(scope, vertex.id().toString());
memory.add(VOTE_TO_HALT, false);
}
} else {
// by the second iteration all vertices that matter should have a component assigned
String currentComponent = vertex.value(property);
boolean different = false;
// iterate through messages received and determine if there is a component that has a lesser value than
// the currently assigned one
final Iterator<String> componentIterator = messenger.receiveMessages();
while(componentIterator.hasNext()) {
final String candidateComponent = componentIterator.next();
if (candidateComponent.compareTo(currentComponent) < 0) {
currentComponent = candidateComponent;
different = true;
}
}
// if there is a better component then assign it and notify adjacent vertices. triggering the message
// passing should not halt future executions
if (different) {
vertex.property(VertexProperty.Cardinality.single, property, currentComponent);
messenger.sendMessage(scope, currentComponent);
memory.add(VOTE_TO_HALT, false);
}
}
}
@Override
public Set<VertexComputeKey> getVertexComputeKeys() {
return new HashSet<>(Arrays.asList(
VertexComputeKey.of(property, false),
VertexComputeKey.of(TraversalVertexProgram.HALTED_TRAVERSERS, false)));
}
@Override
public Set<MemoryComputeKey> getMemoryComputeKeys() {
return MEMORY_COMPUTE_KEYS;
}
@Override
public boolean terminate(final Memory memory) {
if (memory.isInitialIteration() && this.haltedTraversersIndex != null) {
this.haltedTraversersIndex.clear();
}
final boolean voteToHalt = memory.<Boolean>get(VOTE_TO_HALT);
if (voteToHalt) {
return true;
} else {
// it is basically always assumed that the program will want to halt, but if message passing occurs, the
// program will want to continue, thus reset false values to true for future iterations
memory.set(VOTE_TO_HALT, true);
return false;
}
}
@Override
public Set<MessageScope> getMessageScopes(final Memory memory) {
return scopes;
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.NEW;
}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.VERTEX_PROPERTIES;
}
@Override
@SuppressWarnings("CloneDoesntCallSuperClone,CloneDoesntDeclareCloneNotSupportedException")
public ConnectedComponentVertexProgram clone() {
return this;
}
@Override
public Features getFeatures() {
return new Features() {
@Override
public boolean requiresLocalMessageScopes() {
return true;
}
@Override
public boolean requiresVertexPropertyAddition() {
return true;
}
};
}
private void copyHaltedTraversersFromMemory(final Vertex vertex) {
final Collection<Traverser.Admin<Vertex>> traversers = this.haltedTraversersIndex.get(vertex);
if (traversers != null) {
final TraverserSet<Vertex> newHaltedTraversers = new TraverserSet<>();
newHaltedTraversers.addAll(traversers);
vertex.property(VertexProperty.Cardinality.single, TraversalVertexProgram.HALTED_TRAVERSERS, newHaltedTraversers);
}
}
public static ConnectedComponentVertexProgram.Builder build() {
return new ConnectedComponentVertexProgram.Builder();
}
public static final class Builder extends AbstractVertexProgramBuilder<ConnectedComponentVertexProgram.Builder> {
private Builder() {
super(ConnectedComponentVertexProgram.class);
}
public ConnectedComponentVertexProgram.Builder edges(final Traversal.Admin<Vertex, Edge> edgeTraversal) {
PureTraversal.storeState(this.configuration, EDGE_TRAVERSAL, edgeTraversal);
return this;
}
public ConnectedComponentVertexProgram.Builder property(final String key) {
this.configuration.setProperty(PROPERTY, key);
return this;
}
}
}