blob: d16c8b849c3207dce5cf239d61de139854818ed9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.process.computer;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationUtils;
import org.apache.tinkerpop.gremlin.ExceptionCoverage;
import org.apache.tinkerpop.gremlin.LoadGraphWith;
import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest;
import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.PeerPressureVertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
import org.apache.tinkerpop.gremlin.process.computer.util.StaticVertexProgram;
import org.apache.tinkerpop.gremlin.process.traversal.Operator;
import org.apache.tinkerpop.gremlin.process.traversal.P;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyPath;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Property;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.javatuples.Pair;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.GRATEFUL;
import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.MODERN;
import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.SINK;
import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.outE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeNoException;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
@ExceptionCoverage(exceptionClass = GraphComputer.Exceptions.class, methods = {
"providedKeyIsNotAMemoryComputeKey",
"computerHasNoVertexProgramNorMapReducers",
"computerHasAlreadyBeenSubmittedAVertexProgram",
"providedKeyIsNotAnElementComputeKey",
"incidentAndAdjacentElementsCanNotBeAccessedInMapReduce",
"adjacentVertexLabelsCanNotBeRead",
"adjacentVertexPropertiesCanNotBeReadOrUpdated",
"adjacentVertexEdgesAndVerticesCanNotBeReadOrUpdated",
"resultGraphPersistCombinationNotSupported",
"vertexPropertiesCanNotBeUpdatedInMapReduce",
"computerRequiresMoreWorkersThanSupported",
"vertexFilterAccessesIncidentEdges",
"edgeFilterAccessesAdjacentVertices",
"graphFilterNotSupported"
})
@ExceptionCoverage(exceptionClass = Memory.Exceptions.class, methods = {
"memoryKeyCanNotBeEmpty",
"memoryKeyCanNotBeNull",
"memoryValueCanNotBeNull",
"memoryIsCurrentlyImmutable",
"memoryDoesNotExist",
"memorySetOnlyDuringVertexProgramSetUpAndTerminate",
"memoryAddOnlyDuringVertexProgramExecute",
"adjacentVertexEdgesAndVerticesCanNotBeReadOrUpdated"
})
@ExceptionCoverage(exceptionClass = Graph.Exceptions.class, methods = {
"graphDoesNotSupportProvidedGraphComputer"
})
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
public class GraphComputerTest extends AbstractGremlinProcessTest {
@Test
@LoadGraphWith(MODERN)
public void shouldHaveStandardStringRepresentation() {
final GraphComputer computer = graphProvider.getGraphComputer(graph);
assertEquals(StringFactory.graphComputerString(computer), computer.toString());
}
@Test
@LoadGraphWith(MODERN)
public void shouldNotAllowWithNoVertexProgramNorMapReducers() throws Exception {
try {
graphProvider.getGraphComputer(graph).submit().get();
fail("Should throw an IllegalStateException when there is no vertex program nor map reducers");
} catch (Exception ex) {
validateException(GraphComputer.Exceptions.computerHasNoVertexProgramNorMapReducers(), ex);
}
}
/////////////////////////////////////////////
@Test
@LoadGraphWith(MODERN)
public void shouldNotAllowBadGraphComputers() {
try {
graph.compute(BadGraphComputer.class);
fail("Providing a bad graph computer class should fail");
} catch (Exception ex) {
validateException(Graph.Exceptions.graphDoesNotSupportProvidedGraphComputer(BadGraphComputer.class), ex);
}
if (!new BadGraphComputer().features().supportsGraphFilter()) {
try {
new BadGraphComputer().vertices(__.hasLabel("software"));
fail("Should throw an unsupported operation exception");
} catch (final UnsupportedOperationException e) {
assertEquals(GraphComputer.Exceptions.graphFilterNotSupported().getMessage(), e.getMessage());
}
try {
new BadGraphComputer().edges(__.bothE());
fail("Should throw an unsupported operation exception");
} catch (final UnsupportedOperationException e) {
assertEquals(GraphComputer.Exceptions.graphFilterNotSupported().getMessage(), e.getMessage());
}
} else {
fail("Should not support graph filter: " + BadGraphComputer.class);
}
}
public static class BadGraphComputer implements GraphComputer {
@Override
public GraphComputer result(final ResultGraph resultGraph) {
return null;
}
@Override
public GraphComputer persist(final Persist persist) {
return null;
}
@Override
public GraphComputer program(final VertexProgram vertexProgram) {
return null;
}
@Override
public GraphComputer mapReduce(final MapReduce mapReduce) {
return null;
}
@Override
public GraphComputer workers(final int workers) {
return null;
}
@Override
public GraphComputer vertices(final Traversal<Vertex, Vertex> vertexFilter) {
throw GraphComputer.Exceptions.graphFilterNotSupported();
}
@Override
public GraphComputer edges(final Traversal<Vertex, Edge> edgeFilter) {
throw GraphComputer.Exceptions.graphFilterNotSupported();
}
@Override
public GraphComputer configure(final String key, final Object value) {
return null;
}
@Override
public Future<ComputerResult> submit() {
return null;
}
public GraphComputer.Features features() {
return new Features() {
@Override
public boolean supportsGraphFilter() {
return false;
}
};
}
}
/////////////////////////////////////////////
/////////////////////////////////////////////
@Test
@LoadGraphWith(MODERN)
public void shouldHaveImmutableComputeResultMemory() throws Exception {
final ComputerResult results = graphProvider.getGraphComputer(graph).program(new VertexProgramB()).submit().get();
try {
results.memory().set("set", "test");
} catch (Exception ex) {
validateException(Memory.Exceptions.memoryIsCurrentlyImmutable(), ex);
}
try {
results.memory().add("incr", 1);
} catch (Exception ex) {
validateException(Memory.Exceptions.memoryIsCurrentlyImmutable(), ex);
}
try {
results.memory().add("and", true);
} catch (Exception ex) {
validateException(Memory.Exceptions.memoryIsCurrentlyImmutable(), ex);
}
try {
results.memory().add("or", false);
} catch (Exception ex) {
validateException(Memory.Exceptions.memoryIsCurrentlyImmutable(), ex);
}
}
public static class VertexProgramB extends StaticVertexProgram {
@Override
public void setup(final Memory memory) {
assertEquals(0, memory.getIteration());
assertTrue(memory.isInitialIteration());
}
@Override
public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
assertEquals(0, memory.getIteration());
assertTrue(memory.isInitialIteration());
}
@Override
public boolean terminate(final Memory memory) {
assertEquals(0, memory.getIteration());
assertTrue(memory.isInitialIteration());
return true;
}
@Override
public Set<MemoryComputeKey> getMemoryComputeKeys() {
return new HashSet<>(Arrays.asList(
MemoryComputeKey.of("set", Operator.assign, true, false),
MemoryComputeKey.of("incr", Operator.sum, true, false),
MemoryComputeKey.of("and", Operator.and, true, false),
MemoryComputeKey.of("or", Operator.or, true, false)));
}
@Override
public Set<MessageScope> getMessageScopes(final Memory memory) {
return Collections.emptySet();
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.ORIGINAL;
}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.NOTHING;
}
}
/////////////////////////////////////////////
@Test
@LoadGraphWith(MODERN)
public void shouldNotAllowNullMemoryKeys() throws Exception {
try {
graphProvider.getGraphComputer(graph).program(new VertexProgramC()).submit().get();
fail("Providing null memory key should fail");
} catch (Exception ex) {
// validateException(Memory.Exceptions.memoryKeyCanNotBeNull(), ex);
}
}
public static class VertexProgramC extends StaticVertexProgram {
@Override
public void setup(final Memory memory) {
}
@Override
public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
}
@Override
public boolean terminate(final Memory memory) {
return true;
}
@Override
public Set<MemoryComputeKey> getMemoryComputeKeys() {
return Collections.singleton(MemoryComputeKey.of(null, Operator.or, true, false));
}
@Override
public Set<MessageScope> getMessageScopes(final Memory memory) {
return Collections.emptySet();
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.ORIGINAL;
}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.NOTHING;
}
}
/////////////////////////////////////////////
/////////////////////////////////////////////
@Test
@LoadGraphWith(MODERN)
public void shouldNotAllowEmptyMemoryKeys() throws Exception {
try {
graphProvider.getGraphComputer(graph).program(new VertexProgramD()).submit().get();
fail("Providing empty memory key should fail");
} catch (Exception ex) {
validateException(Memory.Exceptions.memoryKeyCanNotBeEmpty(), ex);
}
}
public static class VertexProgramD extends StaticVertexProgram {
@Override
public void setup(final Memory memory) {
}
@Override
public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
}
@Override
public boolean terminate(final Memory memory) {
return true;
}
@Override
public Set<MemoryComputeKey> getMemoryComputeKeys() {
return Collections.singleton(MemoryComputeKey.of("", Operator.or, true, false));
}
@Override
public Set<MessageScope> getMessageScopes(final Memory memory) {
return Collections.emptySet();
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.ORIGINAL;
}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.NOTHING;
}
}
////////////////////////////////////////////
////////////////////////////////////////////
@Test
@LoadGraphWith(MODERN)
public void shouldHandleUndeclaredMemoryKeysCorrectly() throws Exception {
graphProvider.getGraphComputer(graph).program(new VertexProgramE()).submit().get();
}
public static class VertexProgramE extends StaticVertexProgram {
@Override
public void setup(final Memory memory) {
try {
memory.get("a");
fail("The memory key does not exist and should fail");
} catch (Exception e) {
validateException(Memory.Exceptions.memoryDoesNotExist("a"), e);
}
try {
memory.set("a", true);
fail("Setting a memory key that wasn't declared should fail");
} catch (Exception e) {
validateException(GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey("a"), e);
}
}
@Override
public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
try {
memory.get("a");
fail("The memory key does not exist and should fail");
} catch (Exception e) {
validateException(Memory.Exceptions.memoryDoesNotExist("a"), e);
}
try {
memory.add("a", true);
fail("Setting a memory key that wasn't declared should fail");
} catch (Exception e) {
validateException(GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey("a"), e);
}
}
@Override
public boolean terminate(final Memory memory) {
try {
memory.get("a");
fail("The memory key does not exist and should fail");
} catch (Exception e) {
validateException(Memory.Exceptions.memoryDoesNotExist("a"), e);
}
try {
memory.set("a", true);
// fail("Setting a memory key that wasn't declared should fail");
} catch (Exception e) {
validateException(GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey("a"), e);
}
return true;
}
@Override
public Set<MessageScope> getMessageScopes(final Memory memory) {
return Collections.emptySet();
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.ORIGINAL;
}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.NOTHING;
}
}
////////////////////////////////////////////
/////////////////////////////////////////////
@Test
@LoadGraphWith(MODERN)
public void shouldNotAllowTheSameComputerToExecutedTwice() throws Exception {
final GraphComputer computer = graphProvider.getGraphComputer(graph).program(new VertexProgramA());
computer.submit().get(); // this should work as its the first run of the graph computer
try {
computer.submit(); // this should fail as the computer has already been executed
fail("Using the same graph computer to compute again should not be possible");
} catch (IllegalStateException e) {
} catch (Exception e) {
fail("Should yield an illegal state exception for graph computer being executed twice");
}
// test no rerun of graph computer
try {
computer.submit(); // this should fail as the computer has already been executed even through new program submitted
fail("Using the same graph computer to compute again should not be possible");
} catch (IllegalStateException e) {
} catch (Exception e) {
fail("Should yield an illegal state exception for graph computer being executed twice");
}
}
public static class VertexProgramA extends StaticVertexProgram {
@Override
public void setup(final Memory memory) {
}
@Override
public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
}
@Override
public boolean terminate(final Memory memory) {
return true;
}
@Override
public Set<MessageScope> getMessageScopes(final Memory memory) {
return Collections.emptySet();
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.ORIGINAL;
}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.NOTHING;
}
}
/////////////////////////////////////////////
/////////////////////////////////////////////
@Test
@LoadGraphWith(MODERN)
public void shouldHaveConsistentMemoryVertexPropertiesAndExceptions() throws Exception {
ComputerResult results = graphProvider.getGraphComputer(graph).program(new VertexProgramF()).submit().get();
assertEquals(1, results.memory().getIteration());
assertEquals(2, results.memory().asMap().size());
assertEquals(2, results.memory().keys().size());
assertTrue(results.memory().keys().contains("a"));
assertTrue(results.memory().keys().contains("b"));
assertTrue(results.memory().getRuntime() >= 0);
assertEquals(12, results.memory().<Integer>get("a").intValue()); // 2 iterations
assertEquals(28, results.memory().<Integer>get("b").intValue());
try {
results.memory().get("BAD");
fail("Should throw an IllegalArgumentException");
} catch (IllegalArgumentException e) {
assertEquals(Memory.Exceptions.memoryDoesNotExist("BAD").getMessage(), e.getMessage());
}
assertEquals(Long.valueOf(0), results.graph().traversal().V().count().next()); // persist new/nothing.
results.graph().traversal().V().forEachRemaining(v -> {
assertTrue(v.property("nameLengthCounter").isPresent());
assertEquals(Integer.valueOf(v.<String>value("name").length() * 2), Integer.valueOf(v.<Integer>value("nameLengthCounter")));
});
}
public static class VertexProgramF extends StaticVertexProgram<Object> {
@Override
public void setup(final Memory memory) {
}
@Override
public void execute(final Vertex vertex, final Messenger<Object> messenger, final Memory memory) {
try {
vertex.property(VertexProperty.Cardinality.single, "blah", "blah");
fail("Should throw an IllegalArgumentException");
} catch (final IllegalArgumentException e) {
assertEquals(GraphComputer.Exceptions.providedKeyIsNotAnElementComputeKey("blah").getMessage(), e.getMessage());
} catch (final Exception e) {
fail("Should throw an IllegalArgumentException: " + e);
}
memory.add("a", 1);
if (memory.isInitialIteration()) {
vertex.property(VertexProperty.Cardinality.single, "nameLengthCounter", vertex.<String>value("name").length());
memory.add("b", vertex.<String>value("name").length());
} else {
vertex.property(VertexProperty.Cardinality.single, "nameLengthCounter", vertex.<String>value("name").length() + vertex.<Integer>value("nameLengthCounter"));
}
}
@Override
public boolean terminate(final Memory memory) {
return memory.getIteration() == 1;
}
@Override
public Set<VertexComputeKey> getVertexComputeKeys() {
return Collections.singleton(VertexComputeKey.of("nameLengthCounter", false));
}
@Override
public Set<MemoryComputeKey> getMemoryComputeKeys() {
return new HashSet<>(Arrays.asList(
MemoryComputeKey.of("a", Operator.sum, true, false),
MemoryComputeKey.of("b", Operator.sum, true, false)));
}
@Override
public Set<MessageScope> getMessageScopes(Memory memory) {
return Collections.emptySet();
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.NEW;
}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.NOTHING;
}
}
/////////////////////////////////////////////
/////////////////////////////////////////////
@Test
@LoadGraphWith(MODERN)
public void shouldAndOrIncrCorrectlyThroughSubStages() throws Exception {
ComputerResult results = graphProvider.getGraphComputer(graph).program(new VertexProgramG()).submit().get();
assertEquals(2, results.memory().getIteration());
assertEquals(6, results.memory().asMap().size());
assertEquals(6, results.memory().keys().size());
assertTrue(results.memory().keys().contains("a"));
assertTrue(results.memory().keys().contains("b"));
assertTrue(results.memory().keys().contains("c"));
assertTrue(results.memory().keys().contains("d"));
assertTrue(results.memory().keys().contains("e"));
assertTrue(results.memory().keys().contains("f"));
assertEquals(Long.valueOf(18), results.memory().get("a"));
assertEquals(Long.valueOf(0), results.memory().get("b"));
assertFalse(results.memory().get("c"));
assertTrue(results.memory().get("d"));
assertTrue(results.memory().get("e"));
assertEquals(3, results.memory().<Integer>get("f").intValue());
}
public static class VertexProgramG extends StaticVertexProgram {
@Override
public void setup(final Memory memory) {
memory.set("a", 0l);
memory.set("b", 0l);
memory.set("c", true);
memory.set("d", false);
memory.set("e", true);
memory.set("f", memory.getIteration());
try {
memory.add("a", 0l);
fail("Should only allow Memory.set() during VertexProgram.setup()");
} catch (final Exception e) {
validateException(Memory.Exceptions.memoryAddOnlyDuringVertexProgramExecute("a"), e);
}
}
@Override
public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
// test current step values
assertEquals(Long.valueOf(6 * memory.getIteration()), memory.get("a"));
assertEquals(Long.valueOf(0), memory.get("b"));
if (memory.isInitialIteration()) {
assertTrue(memory.get("c"));
assertFalse(memory.get("d"));
} else {
assertFalse(memory.get("c"));
assertTrue(memory.get("d"));
}
assertTrue(memory.get("e"));
assertEquals(memory.getIteration(), memory.<Integer>get("f").intValue());
// update current step values
memory.add("a", 1l);
memory.add("b", 1l);
memory.add("c", false);
memory.add("d", true);
memory.add("e", false);
memory.add("f", memory.getIteration() + 1);
// test current step values, should be the same as previous prior to update
assertEquals(Long.valueOf(6 * memory.getIteration()), memory.get("a"));
assertEquals(Long.valueOf(0), memory.get("b"));
if (memory.isInitialIteration()) {
assertTrue(memory.get("c"));
assertFalse(memory.get("d"));
} else {
assertFalse(memory.get("c"));
assertTrue(memory.get("d"));
}
assertTrue(memory.get("e"));
assertEquals(memory.getIteration(), memory.<Integer>get("f").intValue());
try {
memory.set("a", 0l);
fail("Should only allow Memory.add() during VertexProgram.execute()");
} catch (final Exception e) {
validateException(Memory.Exceptions.memorySetOnlyDuringVertexProgramSetUpAndTerminate("a"), e);
}
}
@Override
public boolean terminate(Memory memory) {
assertEquals(Long.valueOf(6 * (memory.getIteration() + 1)), memory.get("a"));
assertEquals(Long.valueOf(6), memory.get("b"));
assertFalse(memory.get("c"));
assertTrue(memory.get("d"));
assertFalse(memory.get("e"));
assertEquals(memory.getIteration() + 1, memory.<Integer>get("f").intValue());
memory.set("b", 0l);
memory.set("e", true);
try {
memory.add("a", 0l);
fail("Should only allow Memory.set() during VertexProgram.terminate()");
} catch (final Exception e) {
validateException(Memory.Exceptions.memoryAddOnlyDuringVertexProgramExecute("a"), e);
}
return memory.getIteration() > 1;
}
@Override
public Set<MemoryComputeKey> getMemoryComputeKeys() {
return new HashSet<>(Arrays.asList(
MemoryComputeKey.of("a", Operator.sum, true, false),
MemoryComputeKey.of("b", Operator.sum, true, false),
MemoryComputeKey.of("c", Operator.and, true, false),
MemoryComputeKey.of("d", Operator.or, true, false),
MemoryComputeKey.of("e", Operator.and, true, false),
MemoryComputeKey.of("f", Operator.assign, true, false)));
}
@Override
public Set<MessageScope> getMessageScopes(Memory memory) {
return Collections.emptySet();
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.NEW;
}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.NOTHING;
}
}
/////////////////////////////////////////////
/////////////////////////////////////////////
@Test
@LoadGraphWith(MODERN)
public void shouldAllowMapReduceWithNoVertexProgram() throws Exception {
final ComputerResult results = graphProvider.getGraphComputer(graph).mapReduce(new MapReduceA()).submit().get();
assertEquals(123, results.memory().<Integer>get("ageSum").intValue());
}
private static class MapReduceA extends StaticMapReduce<MapReduce.NullObject, Integer, MapReduce.NullObject, Integer, Integer> {
@Override
public boolean doStage(final Stage stage) {
return stage.equals(Stage.MAP) || stage.equals(Stage.REDUCE);
}
@Override
public void map(final Vertex vertex, final MapEmitter<NullObject, Integer> emitter) {
vertex.<Integer>property("age").ifPresent(emitter::emit);
}
@Override
public void reduce(NullObject key, Iterator<Integer> values, ReduceEmitter<NullObject, Integer> emitter) {
int sum = 0;
while (values.hasNext()) {
sum = sum + values.next();
}
emitter.emit(sum);
}
@Override
public Integer generateFinalResult(Iterator<KeyValue<NullObject, Integer>> keyValues) {
return keyValues.next().getValue();
}
@Override
public String getMemoryKey() {
return "ageSum";
}
}
/////////////////////////////////////////////
@Test
@LoadGraphWith(MODERN)
public void shouldSupportMultipleMapReduceJobs() throws Exception {
final ComputerResult results = graphProvider.getGraphComputer(graph)
.program(new VertexProgramH())
.mapReduce(new MapReduceH1())
.mapReduce(new MapReduceH2()).submit().get();
assertEquals(60, results.memory().<Integer>get("a").intValue());
assertEquals(1, results.memory().<Integer>get("b").intValue());
}
public static class VertexProgramH extends StaticVertexProgram {
@Override
public void setup(final Memory memory) {
}
@Override
public void execute(Vertex vertex, Messenger messenger, Memory memory) {
vertex.property(VertexProperty.Cardinality.single, "counter", memory.isInitialIteration() ? 1 : vertex.<Integer>value("counter") + 1);
}
@Override
public boolean terminate(final Memory memory) {
return memory.getIteration() > 8;
}
@Override
public Set<VertexComputeKey> getVertexComputeKeys() {
return Collections.singleton(VertexComputeKey.of("counter", false));
}
@Override
public Set<MessageScope> getMessageScopes(Memory memory) {
return Collections.emptySet();
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.NEW;
}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.NOTHING;
}
}
private static class MapReduceH1 extends StaticMapReduce<MapReduce.NullObject, Integer, MapReduce.NullObject, Integer, Integer> {
@Override
public boolean doStage(final Stage stage) {
return stage.equals(Stage.MAP) || stage.equals(Stage.REDUCE);
}
@Override
public void map(final Vertex vertex, final MapEmitter<NullObject, Integer> emitter) {
vertex.<Integer>property("counter").ifPresent(emitter::emit);
}
@Override
public void reduce(final NullObject key, final Iterator<Integer> values, final ReduceEmitter<NullObject, Integer> emitter) {
int sum = 0;
while (values.hasNext()) {
sum = sum + values.next();
}
emitter.emit(sum);
}
@Override
public Integer generateFinalResult(final Iterator<KeyValue<NullObject, Integer>> keyValues) {
return keyValues.next().getValue();
}
@Override
public String getMemoryKey() {
return "a";
}
}
private static class MapReduceH2 extends StaticMapReduce<Integer, Integer, Integer, Integer, Integer> {
@Override
public boolean doStage(final Stage stage) {
return true;
}
@Override
public void map(final Vertex vertex, final MapEmitter<Integer, Integer> emitter) {
vertex.<Integer>property("age").ifPresent(age -> emitter.emit(age, age));
}
@Override
public void combine(Integer key, Iterator<Integer> values, ReduceEmitter<Integer, Integer> emitter) {
values.forEachRemaining(i -> emitter.emit(i, 1));
}
@Override
public void reduce(Integer key, Iterator<Integer> values, ReduceEmitter<Integer, Integer> emitter) {
values.forEachRemaining(i -> emitter.emit(i, 1));
}
@Override
public Integer generateFinalResult(Iterator<KeyValue<Integer, Integer>> keyValues) {
return keyValues.next().getValue();
}
@Override
public String getMemoryKey() {
return "b";
}
}
/////////////////////////////////////////////
/////////////////////////////////////////////
@Test
@LoadGraphWith(MODERN)
public void shouldSortReduceOutput() throws Exception {
final ComputerResult results = graphProvider.getGraphComputer(graph).mapReduce(new MapReduceB()).submit().get();
final List<Integer> nameLengths = results.memory().get("nameLengths");
assertEquals(6, nameLengths.size());
for (int i = 1; i < nameLengths.size(); i++) {
assertTrue(nameLengths.get(i) <= nameLengths.get(i - 1));
}
}
public static class MapReduceB extends StaticMapReduce<Integer, Integer, Integer, Integer, List<Integer>> {
@Override
public boolean doStage(final Stage stage) {
return stage.equals(Stage.REDUCE) || stage.equals(Stage.MAP);
}
@Override
public void map(final Vertex vertex, final MapEmitter<Integer, Integer> emitter) {
emitter.emit(vertex.<String>value("name").length(), vertex.<String>value("name").length());
}
@Override
public void reduce(Integer key, Iterator<Integer> values, ReduceEmitter<Integer, Integer> emitter) {
values.forEachRemaining(id -> emitter.emit(id, id));
}
@Override
public Optional<Comparator<Integer>> getReduceKeySort() {
return Optional.of(Comparator.<Integer>reverseOrder());
}
@Override
public String getMemoryKey() {
return "nameLengths";
}
@Override
public List<Integer> generateFinalResult(final Iterator<KeyValue<Integer, Integer>> keyValues) {
final List<Integer> list = new ArrayList<>();
keyValues.forEachRemaining(nameLength -> list.add(nameLength.getKey()));
return list;
}
}
/////////////////////////////////////////////
@Test
@LoadGraphWith(MODERN)
public void shouldSortMapOutput() throws Exception {
final ComputerResult results = graphProvider.getGraphComputer(graph).mapReduce(new MapReduceBB()).submit().get();
final List<Integer> nameLengths = results.memory().get("nameLengths");
assertEquals(6, nameLengths.size());
for (int i = 1; i < nameLengths.size(); i++) {
assertTrue(nameLengths.get(i) <= nameLengths.get(i - 1));
}
}
public static class MapReduceBB extends StaticMapReduce<Integer, Integer, Integer, Integer, List<Integer>> {
@Override
public boolean doStage(final Stage stage) {
return stage.equals(Stage.MAP);
}
@Override
public void map(final Vertex vertex, final MapEmitter<Integer, Integer> emitter) {
emitter.emit(vertex.<String>value("name").length(), vertex.<String>value("name").length());
}
@Override
public Optional<Comparator<Integer>> getMapKeySort() {
return Optional.of(Comparator.<Integer>reverseOrder());
}
@Override
public String getMemoryKey() {
return "nameLengths";
}
@Override
public List<Integer> generateFinalResult(final Iterator<KeyValue<Integer, Integer>> keyValues) {
final List<Integer> list = new ArrayList<>();
keyValues.forEachRemaining(nameLength -> list.add(nameLength.getKey()));
return list;
}
}
/////////////////////////////////////////////
@Test
@LoadGraphWith(MODERN)
public void shouldOnlyAllowReadingVertexPropertiesInMapReduce() throws Exception {
graphProvider.getGraphComputer(graph).mapReduce(new MapReduceC()).submit().get();
}
public static class MapReduceC extends StaticMapReduce<MapReduce.NullObject, MapReduce.NullObject, MapReduce.NullObject, MapReduce.NullObject, MapReduce.NullObject> {
@Override
public boolean doStage(final Stage stage) {
return stage.equals(Stage.MAP);
}
@Override
public void map(final Vertex vertex, final MapEmitter<MapReduce.NullObject, MapReduce.NullObject> emitter) {
try {
vertex.edges(Direction.OUT);
fail("Edges should not be accessible in MapReduce.map()");
} catch (final UnsupportedOperationException e) {
assertEquals(GraphComputer.Exceptions.incidentAndAdjacentElementsCanNotBeAccessedInMapReduce().getMessage(), e.getMessage());
}
try {
vertex.edges(Direction.IN);
fail("Edges should not be accessible in MapReduce.map()");
} catch (final UnsupportedOperationException e) {
assertEquals(GraphComputer.Exceptions.incidentAndAdjacentElementsCanNotBeAccessedInMapReduce().getMessage(), e.getMessage());
}
try {
vertex.edges(Direction.BOTH);
fail("Edges should not be accessible in MapReduce.map()");
} catch (final UnsupportedOperationException e) {
assertEquals(GraphComputer.Exceptions.incidentAndAdjacentElementsCanNotBeAccessedInMapReduce().getMessage(), e.getMessage());
}
////
try {
vertex.property("name", "bob");
fail("Vertex properties should be immutable in MapReduce.map()");
} catch (final UnsupportedOperationException e) {
assertEquals(GraphComputer.Exceptions.vertexPropertiesCanNotBeUpdatedInMapReduce().getMessage(), e.getMessage());
}
try {
vertex.property("name").property("test", 1);
fail("Vertex properties should be immutable in MapReduce.map()");
} catch (final UnsupportedOperationException e) {
assertEquals(GraphComputer.Exceptions.vertexPropertiesCanNotBeUpdatedInMapReduce().getMessage(), e.getMessage());
}
}
@Override
public String getMemoryKey() {
return "nothing";
}
@Override
public MapReduce.NullObject generateFinalResult(final Iterator<KeyValue<MapReduce.NullObject, MapReduce.NullObject>> keyValues) {
return MapReduce.NullObject.instance();
}
}
/////////////////////////////////////////////
/////////////////////////////////////////////
@Test
@LoadGraphWith(MODERN)
public void shouldOnlyAllowIDAccessOfAdjacentVertices() throws Exception {
graphProvider.getGraphComputer(graph).program(new VertexProgramI()).submit().get();
}
public static class VertexProgramI extends StaticVertexProgram<MapReduce.NullObject> {
@Override
public void setup(final Memory memory) {
}
@Override
public void execute(Vertex vertex, Messenger messenger, Memory memory) {
vertex.vertices(Direction.OUT).forEachRemaining(Vertex::id);
vertex.vertices(Direction.IN).forEachRemaining(Vertex::id);
vertex.vertices(Direction.BOTH).forEachRemaining(Vertex::id);
if (vertex.vertices(Direction.OUT).hasNext()) {
try {
vertex.vertices(Direction.OUT).forEachRemaining(Vertex::label);
fail("Adjacent vertex labels should not be accessible in VertexProgram.execute()");
} catch (UnsupportedOperationException e) {
assertEquals(GraphComputer.Exceptions.adjacentVertexLabelsCanNotBeRead().getMessage(), e.getMessage());
}
}
if (vertex.vertices(Direction.IN).hasNext()) {
try {
vertex.vertices(Direction.IN).forEachRemaining(Vertex::label);
fail("Adjacent vertex labels should not be accessible in VertexProgram.execute()");
} catch (UnsupportedOperationException e) {
assertEquals(GraphComputer.Exceptions.adjacentVertexLabelsCanNotBeRead().getMessage(), e.getMessage());
}
}
if (vertex.vertices(Direction.BOTH).hasNext()) {
try {
vertex.vertices(Direction.BOTH).forEachRemaining(Vertex::label);
fail("Adjacent vertex labels should not be accessible in VertexProgram.execute()");
} catch (UnsupportedOperationException e) {
assertEquals(GraphComputer.Exceptions.adjacentVertexLabelsCanNotBeRead().getMessage(), e.getMessage());
}
}
////////////////////
if (vertex.vertices(Direction.OUT).hasNext()) {
try {
vertex.vertices(Direction.OUT).forEachRemaining(v -> v.property("name"));
fail("Adjacent vertex properties should not be accessible in VertexProgram.execute()");
} catch (UnsupportedOperationException e) {
assertEquals(GraphComputer.Exceptions.adjacentVertexPropertiesCanNotBeReadOrUpdated().getMessage(), e.getMessage());
}
}
if (vertex.vertices(Direction.IN).hasNext()) {
try {
vertex.vertices(Direction.IN).forEachRemaining(v -> v.property("name"));
fail("Adjacent vertex properties should not be accessible in VertexProgram.execute()");
} catch (UnsupportedOperationException e) {
assertEquals(GraphComputer.Exceptions.adjacentVertexPropertiesCanNotBeReadOrUpdated().getMessage(), e.getMessage());
}
}
if (vertex.vertices(Direction.BOTH).hasNext()) {
try {
vertex.vertices(Direction.BOTH).forEachRemaining(v -> v.property("name"));
fail("Adjacent vertex properties should not be accessible in VertexProgram.execute()");
} catch (UnsupportedOperationException e) {
assertEquals(GraphComputer.Exceptions.adjacentVertexPropertiesCanNotBeReadOrUpdated().getMessage(), e.getMessage());
}
}
////////////////////
if (vertex.vertices(Direction.BOTH).hasNext()) {
try {
vertex.vertices(Direction.BOTH).forEachRemaining(v -> v.edges(Direction.BOTH));
fail("Adjacent vertex edges should not be accessible in VertexProgram.execute()");
} catch (UnsupportedOperationException e) {
assertEquals(GraphComputer.Exceptions.adjacentVertexEdgesAndVerticesCanNotBeReadOrUpdated().getMessage(), e.getMessage());
}
}
if (vertex.vertices(Direction.BOTH).hasNext()) {
try {
vertex.vertices(Direction.BOTH).forEachRemaining(v -> v.vertices(Direction.BOTH));
fail("Adjacent vertex vertices should not be accessible in VertexProgram.execute()");
} catch (UnsupportedOperationException e) {
assertEquals(GraphComputer.Exceptions.adjacentVertexEdgesAndVerticesCanNotBeReadOrUpdated().getMessage(), e.getMessage());
}
}
}
@Override
public boolean terminate(final Memory memory) {
return memory.getIteration() > 1;
}
@Override
public Set<MessageScope> getMessageScopes(Memory memory) {
return Collections.emptySet();
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.NEW;
}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.NOTHING;
}
}
/////////////////////////////////////////////
/////////////////////////////////////////////
@Test
@LoadGraphWith(MODERN)
public void shouldStartAndEndWorkersForVertexProgramAndMapReduce() throws Exception {
MapReduceI.WORKER_START.clear();
MapReduceI.WORKER_END.clear();
assertEquals(3, graphProvider.getGraphComputer(graph).program(new VertexProgramJ()).mapReduce(new MapReduceI()).submit().get().memory().<Integer>get("a").intValue());
if (MapReduceI.WORKER_START.size() == 2) {
assertEquals(2, MapReduceI.WORKER_START.size());
assertTrue(MapReduceI.WORKER_START.contains(MapReduce.Stage.MAP) && MapReduceI.WORKER_START.contains(MapReduce.Stage.REDUCE));
} else {
assertEquals(3, MapReduceI.WORKER_START.size());
assertTrue(MapReduceI.WORKER_START.contains(MapReduce.Stage.MAP) && MapReduceI.WORKER_START.contains(MapReduce.Stage.COMBINE) && MapReduceI.WORKER_START.contains(MapReduce.Stage.REDUCE));
}
if (MapReduceI.WORKER_END.size() == 2) {
assertEquals(2, MapReduceI.WORKER_END.size());
assertTrue(MapReduceI.WORKER_END.contains(MapReduce.Stage.MAP) && MapReduceI.WORKER_END.contains(MapReduce.Stage.REDUCE));
} else {
assertEquals(3, MapReduceI.WORKER_END.size());
assertTrue(MapReduceI.WORKER_END.contains(MapReduce.Stage.MAP) && MapReduceI.WORKER_END.contains(MapReduce.Stage.COMBINE) && MapReduceI.WORKER_END.contains(MapReduce.Stage.REDUCE));
}
}
public static class VertexProgramJ extends StaticVertexProgram {
@Override
public void setup(final Memory memory) {
memory.set("test", memory.getIteration());
}
@Override
public void workerIterationStart(final Memory memory) {
assertEquals(memory.getIteration() * 6, memory.<Integer>get("test").intValue());
try {
memory.add("test", memory.getIteration());
fail("Should throw an immutable memory exception");
} catch (IllegalStateException e) {
assertEquals(Memory.Exceptions.memoryIsCurrentlyImmutable().getMessage(), e.getMessage());
}
}
@Override
public void execute(Vertex vertex, Messenger messenger, Memory memory) {
assertEquals(memory.getIteration() * 6, memory.<Integer>get("test").intValue());
memory.add("test", 1);
}
@Override
public boolean terminate(final Memory memory) {
return memory.getIteration() > 3;
}
@Override
public void workerIterationEnd(final Memory memory) {
assertEquals(memory.getIteration() * 6, memory.<Integer>get("test").intValue());
try {
memory.set("test", memory.getIteration());
fail("Should throw an immutable memory exception");
} catch (IllegalStateException e) {
assertEquals(Memory.Exceptions.memoryIsCurrentlyImmutable().getMessage(), e.getMessage());
}
}
@Override
public Set<MemoryComputeKey> getMemoryComputeKeys() {
return Collections.singleton(MemoryComputeKey.of("test", Operator.sum, true, false));
}
@Override
public Set<MessageScope> getMessageScopes(Memory memory) {
return Collections.emptySet();
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.NEW;
}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.NOTHING;
}
}
private static class MapReduceI extends StaticMapReduce<MapReduce.NullObject, Integer, MapReduce.NullObject, Integer, Integer> {
private static final Set<Stage> WORKER_START = new ConcurrentSkipListSet<>();
private static final Set<Stage> WORKER_END = new ConcurrentSkipListSet<>();
@Override
public boolean doStage(final Stage stage) {
return true;
}
@Override
public void workerStart(final Stage stage) {
WORKER_START.add(stage);
if (!stage.equals(Stage.MAP))
assertFalse(WORKER_END.isEmpty());
}
@Override
public void map(final Vertex vertex, final MapEmitter<NullObject, Integer> emitter) {
emitter.emit(1);
assertEquals(1, WORKER_START.size());
assertTrue(WORKER_START.contains(Stage.MAP));
}
@Override
public void combine(final NullObject key, final Iterator<Integer> values, final ReduceEmitter<NullObject, Integer> emitter) {
emitter.emit(2);
assertEquals(2, WORKER_START.size());
assertTrue(WORKER_START.contains(Stage.MAP) && WORKER_START.contains(Stage.COMBINE));
assertFalse(WORKER_END.isEmpty());
}
@Override
public void reduce(final NullObject key, final Iterator<Integer> values, final ReduceEmitter<NullObject, Integer> emitter) {
emitter.emit(3);
if (WORKER_START.size() == 2) {
assertEquals(2, WORKER_START.size());
assertTrue(WORKER_START.contains(Stage.MAP) && WORKER_START.contains(Stage.REDUCE));
} else {
assertEquals(3, WORKER_START.size());
assertTrue(WORKER_START.contains(Stage.MAP) && WORKER_START.contains(Stage.COMBINE) && WORKER_START.contains(Stage.REDUCE));
}
assertFalse(WORKER_END.isEmpty());
}
@Override
public void workerEnd(final Stage stage) {
assertFalse(WORKER_START.isEmpty());
if (!stage.equals(Stage.MAP))
assertFalse(WORKER_END.isEmpty());
WORKER_END.add(stage);
}
@Override
public Integer generateFinalResult(final Iterator<KeyValue<NullObject, Integer>> keyValues) {
assertEquals(3, keyValues.next().getValue().intValue());
return 3;
}
@Override
public String getMemoryKey() {
return "a";
}
}
/////////////////////////////////////////////
/////////////////////////////////////////////
@Test
@LoadGraphWith
public void shouldSupportPersistResultGraphPairsStatedInFeatures() throws Exception {
for (final GraphComputer.ResultGraph resultGraph : GraphComputer.ResultGraph.values()) {
for (final GraphComputer.Persist persist : GraphComputer.Persist.values()) {
final GraphComputer computer = graphProvider.getGraphComputer(graph);
if (computer.features().supportsResultGraphPersistCombination(resultGraph, persist)) {
computer.program(new VertexProgramK()).result(resultGraph).persist(persist).submit().get();
} else {
try {
computer.program(new VertexProgramK()).result(resultGraph).persist(persist).submit().get();
fail("The GraphComputer " + computer + " states that it does support the following resultGraph/persist pair: " + resultGraph + ":" + persist);
} catch (final IllegalArgumentException e) {
assertEquals(GraphComputer.Exceptions.resultGraphPersistCombinationNotSupported(resultGraph, persist).getMessage(), e.getMessage());
}
}
}
}
}
@Test
@LoadGraphWith(MODERN)
public void shouldProcessResultGraphNewWithPersistNothing() throws Exception {
final GraphComputer computer = graphProvider.getGraphComputer(graph);
if (computer.features().supportsResultGraphPersistCombination(GraphComputer.ResultGraph.NEW, GraphComputer.Persist.NOTHING)) {
final ComputerResult result = computer.program(new VertexProgramK()).result(GraphComputer.ResultGraph.NEW).persist(GraphComputer.Persist.NOTHING).submit().get();
assertEquals(Long.valueOf(0l), result.graph().traversal().V().count().next());
assertEquals(Long.valueOf(0l), result.graph().traversal().E().count().next());
assertEquals(Long.valueOf(0l), result.graph().traversal().V().values().count().next());
assertEquals(Long.valueOf(0l), result.graph().traversal().E().values().count().next());
assertFalse(result.graph().traversal().V().values("money").sum().hasNext());
///
assertEquals(Long.valueOf(6l), graph.traversal().V().count().next());
assertEquals(Long.valueOf(6l), graph.traversal().E().count().next());
assertEquals(Long.valueOf(12l), graph.traversal().V().values().count().next());
assertEquals(Long.valueOf(6l), graph.traversal().E().values().count().next());
assertFalse(graph.traversal().V().values("money").sum().hasNext());
}
}
@Test
@LoadGraphWith(MODERN)
public void shouldProcessResultGraphNewWithPersistVertexProperties() throws Exception {
final GraphComputer computer = graphProvider.getGraphComputer(graph);
if (computer.features().supportsResultGraphPersistCombination(GraphComputer.ResultGraph.NEW, GraphComputer.Persist.VERTEX_PROPERTIES)) {
final ComputerResult result = computer.program(new VertexProgramK()).result(GraphComputer.ResultGraph.NEW).persist(GraphComputer.Persist.VERTEX_PROPERTIES).submit().get();
assertEquals(Long.valueOf(6l), result.graph().traversal().V().count().next());
assertEquals(Long.valueOf(0l), result.graph().traversal().E().count().next());
assertEquals(Long.valueOf(18l), result.graph().traversal().V().values().count().next());
assertEquals(Long.valueOf(0l), result.graph().traversal().E().values().count().next());
assertEquals(28l, result.graph().traversal().V().values("money").sum().next());
///
assertEquals(Long.valueOf(6l), graph.traversal().V().count().next());
assertEquals(Long.valueOf(6l), graph.traversal().E().count().next());
assertEquals(Long.valueOf(12l), graph.traversal().V().values().count().next());
assertEquals(Long.valueOf(6l), graph.traversal().E().values().count().next());
assertFalse(graph.traversal().V().values("money").sum().hasNext());
}
}
@Test
@LoadGraphWith(MODERN)
public void shouldProcessResultGraphNewWithPersistEdges() throws Exception {
final GraphComputer computer = graphProvider.getGraphComputer(graph);
if (computer.features().supportsResultGraphPersistCombination(GraphComputer.ResultGraph.NEW, GraphComputer.Persist.EDGES)) {
final ComputerResult result = computer.program(new VertexProgramK()).result(GraphComputer.ResultGraph.NEW).persist(GraphComputer.Persist.EDGES).submit().get();
assertEquals(Long.valueOf(6l), result.graph().traversal().V().count().next());
assertEquals(Long.valueOf(6l), result.graph().traversal().E().count().next());
assertEquals(Long.valueOf(18l), result.graph().traversal().V().values().count().next());
assertEquals(Long.valueOf(6l), result.graph().traversal().E().values().count().next());
assertEquals(28l, result.graph().traversal().V().values("money").sum().next());
///
assertEquals(Long.valueOf(6l), graph.traversal().V().count().next());
assertEquals(Long.valueOf(6l), graph.traversal().E().count().next());
assertEquals(Long.valueOf(12l), graph.traversal().V().values().count().next());
assertEquals(Long.valueOf(6l), graph.traversal().E().values().count().next());
assertFalse(graph.traversal().V().values("money").sum().hasNext());
}
}
@Test
@LoadGraphWith(MODERN)
public void shouldProcessResultGraphOriginalWithPersistNothing() throws Exception {
final GraphComputer computer = graphProvider.getGraphComputer(graph);
if (computer.features().supportsResultGraphPersistCombination(GraphComputer.ResultGraph.ORIGINAL, GraphComputer.Persist.NOTHING)) {
final ComputerResult result = computer.program(new VertexProgramK()).result(GraphComputer.ResultGraph.ORIGINAL).persist(GraphComputer.Persist.NOTHING).submit().get();
assertEquals(Long.valueOf(6l), result.graph().traversal().V().count().next());
assertEquals(Long.valueOf(6l), result.graph().traversal().E().count().next());
assertEquals(Long.valueOf(12l), result.graph().traversal().V().values().count().next());
assertEquals(Long.valueOf(6l), result.graph().traversal().E().values().count().next());
assertFalse(result.graph().traversal().V().values("money").sum().hasNext());
///
assertEquals(Long.valueOf(6l), graph.traversal().V().count().next());
assertEquals(Long.valueOf(6l), graph.traversal().E().count().next());
assertEquals(Long.valueOf(12l), graph.traversal().V().values().count().next());
assertEquals(Long.valueOf(6l), graph.traversal().E().values().count().next());
assertFalse(graph.traversal().V().values("money").sum().hasNext());
}
}
@Test
@LoadGraphWith(MODERN)
public void shouldProcessResultGraphOriginalWithPersistVertexProperties() throws Exception {
final GraphComputer computer = graphProvider.getGraphComputer(graph);
if (computer.features().supportsResultGraphPersistCombination(GraphComputer.ResultGraph.ORIGINAL, GraphComputer.Persist.VERTEX_PROPERTIES)) {
final ComputerResult result = computer.program(new VertexProgramK()).result(GraphComputer.ResultGraph.ORIGINAL).persist(GraphComputer.Persist.VERTEX_PROPERTIES).submit().get();
assertEquals(Long.valueOf(6l), result.graph().traversal().V().count().next());
assertEquals(Long.valueOf(6l), result.graph().traversal().E().count().next());
assertEquals(Long.valueOf(18l), result.graph().traversal().V().values().count().next());
assertEquals(Long.valueOf(6l), result.graph().traversal().E().values().count().next());
assertEquals(28l, result.graph().traversal().V().values("money").sum().next());
///
assertEquals(Long.valueOf(6l), graph.traversal().V().count().next());
assertEquals(Long.valueOf(6l), graph.traversal().E().count().next());
assertEquals(Long.valueOf(18l), graph.traversal().V().values().count().next());
assertEquals(Long.valueOf(6l), graph.traversal().E().values().count().next());
assertEquals(28l, graph.traversal().V().values("money").sum().next());
}
}
@Test
@LoadGraphWith(MODERN)
public void shouldProcessResultGraphOriginalWithPersistEdges() throws Exception {
final GraphComputer computer = graphProvider.getGraphComputer(graph);
if (computer.features().supportsResultGraphPersistCombination(GraphComputer.ResultGraph.ORIGINAL, GraphComputer.Persist.EDGES)) {
final ComputerResult result = computer.program(new VertexProgramK()).result(GraphComputer.ResultGraph.ORIGINAL).persist(GraphComputer.Persist.EDGES).submit().get();
assertEquals(Long.valueOf(6l), result.graph().traversal().V().count().next());
assertEquals(Long.valueOf(6l), result.graph().traversal().E().count().next());
assertEquals(Long.valueOf(18l), result.graph().traversal().V().values().count().next());
assertEquals(Long.valueOf(6l), result.graph().traversal().E().values().count().next());
assertEquals(28l, result.graph().traversal().V().values("money").sum().next());
///
assertEquals(Long.valueOf(6l), graph.traversal().V().count().next());
assertEquals(Long.valueOf(6l), graph.traversal().E().count().next());
assertEquals(Long.valueOf(18l), graph.traversal().V().values().count().next());
assertEquals(Long.valueOf(6l), graph.traversal().E().values().count().next());
assertEquals(28l, graph.traversal().V().values("money").sum().next());
}
}
public static class VertexProgramK extends StaticVertexProgram {
@Override
public void setup(final Memory memory) {
}
@Override
public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
vertex.property("money", vertex.<String>value("name").length());
}
@Override
public boolean terminate(final Memory memory) {
return true;
}
@Override
public Set<VertexComputeKey> getVertexComputeKeys() {
return Collections.singleton(VertexComputeKey.of("money", false));
}
@Override
public Set<MessageScope> getMessageScopes(Memory memory) {
return Collections.emptySet();
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.NEW;
}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.EDGES;
}
}
/////////////////////////////////////////////
@Test
@LoadGraphWith(GRATEFUL)
public void shouldSupportWorkerCount() throws Exception {
int maxWorkers = graphProvider.getGraphComputer(graph).features().getMaxWorkers();
if (maxWorkers != Integer.MAX_VALUE) {
for (int i = maxWorkers + 1; i < maxWorkers + 10; i++) {
try {
graphProvider.getGraphComputer(graph).program(new VertexProgramL()).workers(i).submit().get();
fail("Should throw a GraphComputer.Exceptions.computerRequiresMoreWorkersThanSupported() exception");
} catch (final IllegalArgumentException e) {
assertTrue(e.getMessage().contains("computer requires more workers"));
}
}
}
if (maxWorkers > 25) maxWorkers = 25;
for (int i = 1; i <= maxWorkers; i++) {
ComputerResult result = graphProvider.getGraphComputer(graph).program(new VertexProgramL()).workers(i).submit().get();
assertEquals(Integer.valueOf(i).longValue(), (long) result.memory().get("workerCount"));
}
}
public static class VertexProgramL implements VertexProgram {
boolean announced = false;
@Override
public void setup(final Memory memory) {
memory.set("workerCount", 0l);
}
@Override
public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
try {
Thread.sleep(1);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
if (!this.announced) {
memory.add("workerCount", 1l);
this.announced = true;
}
}
@Override
public boolean terminate(final Memory memory) {
return true;
}
@Override
public Set<MemoryComputeKey> getMemoryComputeKeys() {
return Collections.singleton(MemoryComputeKey.of("workerCount", Operator.sum, true, false));
}
/*public void workerIterationStart(final Memory memory) {
assertEquals(0l, (long) memory.get("workerCount"));
}
public void workerIterationEnd(final Memory memory) {
assertEquals(1l, (long) memory.get("workerCount"));
}*/
@Override
public Set<MessageScope> getMessageScopes(Memory memory) {
return Collections.emptySet();
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.NEW;
}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.NOTHING;
}
@Override
@SuppressWarnings("CloneDoesntCallSuperClone,CloneDoesntDeclareCloneNotSupportedException")
public VertexProgramL clone() {
return new VertexProgramL();
}
@Override
public void storeState(final Configuration configuration) {
VertexProgram.super.storeState(configuration);
}
}
/////////////////////////////////////////////
@Test
@LoadGraphWith(MODERN)
public void shouldSupportMultipleScopes() throws ExecutionException, InterruptedException {
final ComputerResult result = graphProvider.getGraphComputer(graph).program(new MultiScopeVertexProgram()).submit().get();
assertEquals(result.graph().traversal().V().has("name", "josh").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 0L);
assertEquals(result.graph().traversal().V().has("name", "lop").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 1L);
assertEquals(result.graph().traversal().V().has("name", "ripple").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 1L);
assertEquals(result.graph().traversal().V().has("name", "marko").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 2L);
}
public static class MultiScopeVertexProgram extends StaticVertexProgram<Long> {
private final MessageScope.Local<Long> countMessageScopeIn = MessageScope.Local.of(__::inE);
private final MessageScope.Local<Long> countMessageScopeOut = MessageScope.Local.of(__::outE);
private static final String MEMORY_KEY = "count";
@Override
public void setup(final Memory memory) {
}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.VERTEX_PROPERTIES;
}
@Override
public Set<VertexComputeKey> getVertexComputeKeys() {
return Collections.singleton(VertexComputeKey.of(MEMORY_KEY, false));
}
@Override
public Set<MessageScope> getMessageScopes(final Memory memory) {
HashSet<MessageScope> scopes = new HashSet<>();
scopes.add(countMessageScopeIn);
scopes.add(countMessageScopeOut);
return scopes;
}
@Override
public void execute(Vertex vertex, Messenger<Long> messenger, Memory memory) {
switch (memory.getIteration()) {
case 0:
if (vertex.value("name").equals("josh")) {
messenger.sendMessage(this.countMessageScopeIn, 2L);
messenger.sendMessage(this.countMessageScopeOut, 1L);
}
break;
case 1:
long edgeCount = IteratorUtils.reduce(messenger.receiveMessages(), 0L, (a, b) -> a + b);
vertex.property(MEMORY_KEY, edgeCount);
break;
}
}
@Override
public boolean terminate(final Memory memory) {
return memory.getIteration() == 1;
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.NEW;
}
}
/////////////////////////////////////////////
/////////////////////////////////////////////
@Test
@LoadGraphWith(MODERN)
public void shouldSupportMultipleScopesWithEdgeFunction() throws ExecutionException, InterruptedException {
final ComputerResult result = graphProvider.getGraphComputer(graph).program(new MultiScopeVertexWithEdgeFunctionProgram()).submit().get();
assertEquals(result.graph().traversal().V().has("name", "josh").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 0L);
assertEquals(result.graph().traversal().V().has("name", "lop").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 4L);
assertEquals(result.graph().traversal().V().has("name", "ripple").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 10L);
assertEquals(result.graph().traversal().V().has("name", "marko").next().property(MultiScopeVertexProgram.MEMORY_KEY).value(), 20L);
}
public static class MultiScopeVertexWithEdgeFunctionProgram extends StaticVertexProgram<Long> {
private final MessageScope.Local<Long> countMessageScopeIn = MessageScope.Local.of(__::inE, (m,e) -> m * Math.round(((Double) e.values("weight").next()) * 10));
private final MessageScope.Local<Long> countMessageScopeOut = MessageScope.Local.of(__::outE, (m,e) -> m * Math.round(((Double) e.values("weight").next()) * 10));
private static final String MEMORY_KEY = "count";
@Override
public void setup(final Memory memory) {
}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.VERTEX_PROPERTIES;
}
@Override
public Set<VertexComputeKey> getVertexComputeKeys() {
return Collections.singleton(VertexComputeKey.of(MEMORY_KEY, false));
}
@Override
public Set<MessageScope> getMessageScopes(final Memory memory) {
HashSet<MessageScope> scopes = new HashSet<>();
scopes.add(countMessageScopeIn);
scopes.add(countMessageScopeOut);
return scopes;
}
@Override
public void execute(Vertex vertex, Messenger<Long> messenger, Memory memory) {
switch (memory.getIteration()) {
case 0:
if (vertex.value("name").equals("josh")) {
messenger.sendMessage(this.countMessageScopeIn, 2L);
messenger.sendMessage(this.countMessageScopeOut, 1L);
}
break;
case 1:
long edgeCount = IteratorUtils.reduce(messenger.receiveMessages(), 0L, (a, b) -> a + b);
vertex.property(MEMORY_KEY, edgeCount);
break;
}
}
@Override
public boolean terminate(final Memory memory) {
return memory.getIteration() == 1;
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.NEW;
}
}
/////////////////////////////////////////////
@Test
@LoadGraphWith(MODERN)
public void shouldSupportGraphFilter() throws Exception {
// if the graph computer does not support graph filter, then make sure its exception handling is correct
if (!graphProvider.getGraphComputer(graph).features().supportsGraphFilter()) {
try {
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("software"));
fail("Should throw an unsupported operation exception");
} catch (final UnsupportedOperationException e) {
assertEquals(GraphComputer.Exceptions.graphFilterNotSupported().getMessage(), e.getMessage());
}
try {
graphProvider.getGraphComputer(graph).edges(__.<Vertex>outE().limit(10));
fail("Should throw an unsupported operation exception");
} catch (final UnsupportedOperationException e) {
assertEquals(GraphComputer.Exceptions.graphFilterNotSupported().getMessage(), e.getMessage());
}
return;
}
/// VERTEX PROGRAM
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("software")).program(new VertexProgramM(VertexProgramM.SOFTWARE_ONLY)).submit().get();
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).program(new VertexProgramM(VertexProgramM.PEOPLE_ONLY)).submit().get();
graphProvider.getGraphComputer(graph).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.KNOWS_ONLY)).submit().get();
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get();
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.<Vertex>bothE("knows").has("weight", P.gt(0.5f))).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get();
graphProvider.getGraphComputer(graph).edges(__.<Vertex>bothE().limit(0)).program(new VertexProgramM(VertexProgramM.VERTICES_ONLY)).submit().get();
graphProvider.getGraphComputer(graph).edges(__.<Vertex>outE().limit(1)).program(new VertexProgramM(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get();
graphProvider.getGraphComputer(graph).edges(outE()).program(new VertexProgramM(VertexProgramM.OUT_EDGES_ONLY)).submit().get();
/// VERTEX PROGRAM + MAP REDUCE
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("software")).program(new VertexProgramM(VertexProgramM.SOFTWARE_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.SOFTWARE_ONLY)).submit().get();
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).program(new VertexProgramM(VertexProgramM.PEOPLE_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_ONLY)).submit().get();
graphProvider.getGraphComputer(graph).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.KNOWS_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.KNOWS_ONLY)).submit().get();
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get();
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.<Vertex>bothE("knows").has("weight", P.gt(0.5f))).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get();
graphProvider.getGraphComputer(graph).edges(__.<Vertex>bothE().limit(0)).program(new VertexProgramM(VertexProgramM.VERTICES_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.VERTICES_ONLY)).submit().get();
graphProvider.getGraphComputer(graph).edges(__.<Vertex>outE().limit(1)).program(new VertexProgramM(VertexProgramM.ONE_OUT_EDGE_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get();
graphProvider.getGraphComputer(graph).edges(outE()).program(new VertexProgramM(VertexProgramM.OUT_EDGES_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.OUT_EDGES_ONLY)).submit().get();
/// MAP REDUCE ONLY
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("software")).mapReduce(new MapReduceJ(VertexProgramM.SOFTWARE_ONLY)).submit().get();
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_ONLY)).submit().get();
graphProvider.getGraphComputer(graph).edges(__.bothE("knows")).mapReduce(new MapReduceJ(VertexProgramM.KNOWS_ONLY)).submit().get();
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.bothE("knows")).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get();
graphProvider.getGraphComputer(graph).vertices(__.hasLabel("person")).edges(__.<Vertex>bothE("knows").has("weight", P.gt(0.5f))).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get();
graphProvider.getGraphComputer(graph).edges(__.<Vertex>bothE().limit(0)).mapReduce(new MapReduceJ(VertexProgramM.VERTICES_ONLY)).submit().get();
graphProvider.getGraphComputer(graph).edges(__.<Vertex>outE().limit(1)).mapReduce(new MapReduceJ(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get();
graphProvider.getGraphComputer(graph).edges(outE()).mapReduce(new MapReduceJ(VertexProgramM.OUT_EDGES_ONLY)).submit().get();
// EXCEPTION HANDLING
try {
graphProvider.getGraphComputer(graph).vertices(__.out());
fail();
} catch (final IllegalArgumentException e) {
assertEquals(e.getMessage(), GraphComputer.Exceptions.vertexFilterAccessesIncidentEdges(__.out()).getMessage());
}
try {
graphProvider.getGraphComputer(graph).edges(__.<Vertex>out().outE());
fail();
} catch (final IllegalArgumentException e) {
assertEquals(e.getMessage(), GraphComputer.Exceptions.edgeFilterAccessesAdjacentVertices(__.<Vertex>out().outE()).getMessage());
}
}
public static class VertexProgramM implements VertexProgram {
public static final String SOFTWARE_ONLY = "softwareOnly";
public static final String PEOPLE_ONLY = "peopleOnly";
public static final String KNOWS_ONLY = "knowsOnly";
public static final String PEOPLE_KNOWS_ONLY = "peopleKnowsOnly";
public static final String PEOPLE_KNOWS_WELL_ONLY = "peopleKnowsWellOnly";
public static final String VERTICES_ONLY = "verticesOnly";
public static final String ONE_OUT_EDGE_ONLY = "oneOutEdgeOnly";
public static final String OUT_EDGES_ONLY = "outEdgesOnly";
private String state;
public VertexProgramM() {
}
public VertexProgramM(final String state) {
this.state = state;
}
@Override
public void setup(final Memory memory) {
}
@Override
public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
switch (this.state) {
case SOFTWARE_ONLY: {
assertEquals("software", vertex.label());
assertFalse(vertex.edges(Direction.OUT).hasNext());
assertTrue(vertex.edges(Direction.IN).hasNext());
assertTrue(vertex.edges(Direction.IN, "created").hasNext());
assertFalse(vertex.edges(Direction.IN, "knows").hasNext());
break;
}
case PEOPLE_ONLY: {
assertEquals("person", vertex.label());
assertFalse(vertex.edges(Direction.IN, "created").hasNext());
assertTrue(IteratorUtils.count(vertex.edges(Direction.BOTH)) > 0);
break;
}
case KNOWS_ONLY: {
assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "created")));
if (vertex.value("name").equals("marko"))
assertEquals(2, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows")));
else if (vertex.value("name").equals("vadas"))
assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
else if (vertex.value("name").equals("josh"))
assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
else {
assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows")));
assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH)));
}
break;
}
case PEOPLE_KNOWS_ONLY: {
assertEquals("person", vertex.label());
assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "created")));
if (vertex.value("name").equals("marko"))
assertEquals(2, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows")));
else if (vertex.value("name").equals("vadas"))
assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
else if (vertex.value("name").equals("josh"))
assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
else {
assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows")));
assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH)));
}
break;
}
case PEOPLE_KNOWS_WELL_ONLY: {
assertEquals("person", vertex.label());
assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "created")));
if (vertex.value("name").equals("marko")) {
assertEquals(1, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows")));
assertEquals(1.0, vertex.edges(Direction.OUT, "knows").next().value("weight"), 0.001);
} else if (vertex.value("name").equals("vadas"))
assertEquals(0, IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
else if (vertex.value("name").equals("josh")) {
assertEquals(1, IteratorUtils.count(vertex.edges(Direction.IN, "knows")));
assertEquals(1.0, vertex.edges(Direction.IN, "knows").next().value("weight"), 0.001);
} else {
assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH, "knows")));
assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH)));
}
break;
}
case VERTICES_ONLY: {
assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH)));
break;
}
case ONE_OUT_EDGE_ONLY: {
if (vertex.label().equals("software") || vertex.value("name").equals("vadas"))
assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH)));
else {
assertEquals(1, IteratorUtils.count(vertex.edges(Direction.OUT)));
assertEquals(0, IteratorUtils.count(vertex.edges(Direction.IN)));
assertEquals(1, IteratorUtils.count(vertex.edges(Direction.BOTH)));
}
break;
}
case OUT_EDGES_ONLY: {
if (vertex.label().equals("software") || vertex.value("name").equals("vadas"))
assertEquals(0, IteratorUtils.count(vertex.edges(Direction.BOTH)));
else {
assertTrue(IteratorUtils.count(vertex.edges(Direction.OUT)) > 0);
assertEquals(0, IteratorUtils.count(vertex.edges(Direction.IN)));
assertEquals(IteratorUtils.count(vertex.edges(Direction.OUT)), IteratorUtils.count(vertex.edges(Direction.BOTH)));
}
break;
}
default:
throw new IllegalStateException("This is an illegal state for this test case: " + this.state);
}
}
@Override
public boolean terminate(final Memory memory) {
return true;
}
@Override
public Set<MessageScope> getMessageScopes(Memory memory) {
return Collections.emptySet();
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.NEW;
}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.NOTHING;
}
@Override
@SuppressWarnings("CloneDoesntCallSuperClone,CloneDoesntDeclareCloneNotSupportedException")
public VertexProgramM clone() {
return new VertexProgramM(this.state);
}
@Override
public void loadState(final Graph graph, final Configuration configuration) {
this.state = configuration.getString("state");
}
@Override
public void storeState(final Configuration configuration) {
configuration.setProperty("state", this.state);
VertexProgram.super.storeState(configuration);
}
}
private static class MapReduceJ implements MapReduce<MapReduce.NullObject, Integer, MapReduce.NullObject, Integer, Integer> {
private String state;
public MapReduceJ() {
}
public MapReduceJ(final String state) {
this.state = state;
}
@Override
public void loadState(final Graph graph, final Configuration configuration) {
this.state = configuration.getString("state");
}
@Override
public void storeState(final Configuration configuration) {
configuration.setProperty("state", this.state);
MapReduce.super.storeState(configuration);
}
@Override
@SuppressWarnings("CloneDoesntCallSuperClone,CloneDoesntDeclareCloneNotSupportedException")
public MapReduceJ clone() {
return new MapReduceJ(this.state);
}
@Override
public boolean doStage(final Stage stage) {
return true;
}
@Override
public void map(final Vertex vertex, final MapEmitter<NullObject, Integer> emitter) {
emitter.emit(1);
switch (this.state) {
case VertexProgramM.SOFTWARE_ONLY: {
assertEquals("software", vertex.label());
break;
}
case VertexProgramM.PEOPLE_ONLY: {
assertEquals("person", vertex.label());
break;
}
case VertexProgramM.KNOWS_ONLY: {
assertTrue(vertex.label().equals("person") || vertex.label().equals("software"));
break;
}
case VertexProgramM.PEOPLE_KNOWS_ONLY: {
assertEquals("person", vertex.label());
break;
}
case VertexProgramM.PEOPLE_KNOWS_WELL_ONLY: {
assertEquals("person", vertex.label());
break;
}
case VertexProgramM.VERTICES_ONLY: {
assertTrue(vertex.label().equals("person") || vertex.label().equals("software"));
break;
}
case VertexProgramM.ONE_OUT_EDGE_ONLY: {
assertTrue(vertex.label().equals("person") || vertex.label().equals("software"));
break;
}
case VertexProgramM.OUT_EDGES_ONLY: {
assertTrue(vertex.label().equals("person") || vertex.label().equals("software"));
break;
}
default:
throw new IllegalStateException("This is an illegal state for this test case: " + this.state);
}
}
@Override
public void combine(final NullObject key, final Iterator<Integer> values, final ReduceEmitter<NullObject, Integer> emitter) {
this.reduce(key, values, emitter);
}
@Override
public void reduce(final NullObject key, final Iterator<Integer> values, final ReduceEmitter<NullObject, Integer> emitter) {
int count = 0;
while (values.hasNext()) {
count = count + values.next();
}
emitter.emit(count);
}
@Override
public Integer generateFinalResult(final Iterator<KeyValue<NullObject, Integer>> keyValues) {
int counter = keyValues.next().getValue();
assertFalse(keyValues.hasNext());
switch (this.state) {
case VertexProgramM.SOFTWARE_ONLY: {
assertEquals(2, counter);
break;
}
case VertexProgramM.PEOPLE_ONLY: {
assertEquals(4, counter);
break;
}
case VertexProgramM.KNOWS_ONLY: {
assertEquals(6, counter);
break;
}
case VertexProgramM.PEOPLE_KNOWS_ONLY: {
assertEquals(4, counter);
break;
}
case VertexProgramM.PEOPLE_KNOWS_WELL_ONLY: {
assertEquals(4, counter);
break;
}
case VertexProgramM.VERTICES_ONLY: {
assertEquals(6, counter);
break;
}
case VertexProgramM.ONE_OUT_EDGE_ONLY: {
assertEquals(6, counter);
break;
}
case VertexProgramM.OUT_EDGES_ONLY: {
assertEquals(6, counter);
break;
}
default:
throw new IllegalStateException("This is an illegal state for this test case: " + this.state);
}
return counter;
}
@Override
public String getMemoryKey() {
return "a";
}
}
@Test
@LoadGraphWith(MODERN)
public void shouldSupportJobChaining() throws Exception {
final ComputerResult result1 = graphProvider.getGraphComputer(graph)
.program(PageRankVertexProgram.build().iterations(5).create(graph)).persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW).submit().get();
final Graph graph1 = result1.graph();
final Memory memory1 = result1.memory();
assertEquals(5, memory1.getIteration());
assertEquals(6, graph1.traversal().V().count().next().intValue());
assertEquals(6, graph1.traversal().E().count().next().intValue());
assertEquals(6, graph1.traversal().V().values(PageRankVertexProgram.PAGE_RANK).count().next().intValue());
assertEquals(18, graph1.traversal().V().values().count().next().intValue());
//
final ComputerResult result2 = graph1.compute(graphProvider.getGraphComputer(graph1).getClass())
.program(PeerPressureVertexProgram.build().maxIterations(4).create(graph1)).persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW).submit().get();
final Graph graph2 = result2.graph();
final Memory memory2 = result2.memory();
assertTrue(memory2.getIteration() <= 4);
assertEquals(6, graph2.traversal().V().count().next().intValue());
assertEquals(6, graph2.traversal().E().count().next().intValue());
assertEquals(6, graph2.traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().intValue());
assertEquals(6, graph2.traversal().V().values(PageRankVertexProgram.PAGE_RANK).count().next().intValue());
assertEquals(24, graph2.traversal().V().values().count().next().intValue());
//
final ComputerResult result3 = graph2.compute(graphProvider.getGraphComputer(graph2).getClass())
.program(TraversalVertexProgram.build().traversal(g.V().groupCount("m").by(__.values(PageRankVertexProgram.PAGE_RANK).count()).label().asAdmin()).create(graph2)).persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW).submit().get();
final Graph graph3 = result3.graph();
final Memory memory3 = result3.memory();
assertTrue(memory3.keys().contains("m"));
assertTrue(memory3.keys().contains(TraversalVertexProgram.HALTED_TRAVERSERS));
assertEquals(1, memory3.<Map<Long, Long>>get("m").size());
assertEquals(6, memory3.<Map<Long, Long>>get("m").get(1l).intValue());
List<Traverser<String>> traversers = IteratorUtils.list(memory3.<TraverserSet>get(TraversalVertexProgram.HALTED_TRAVERSERS).iterator());
assertEquals(6l, traversers.stream().map(Traverser::bulk).reduce((a, b) -> a + b).get().longValue());
assertEquals(4l, traversers.stream().filter(s -> s.get().equals("person")).map(Traverser::bulk).reduce((a, b) -> a + b).get().longValue());
assertEquals(2l, traversers.stream().filter(s -> s.get().equals("software")).map(Traverser::bulk).reduce((a, b) -> a + b).get().longValue());
assertEquals(6, graph3.traversal().V().count().next().intValue());
assertEquals(6, graph3.traversal().E().count().next().intValue());
assertEquals(0, graph3.traversal().V().values(TraversalVertexProgram.HALTED_TRAVERSERS).count().next().intValue());
assertEquals(6, graph3.traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().intValue());
assertEquals(6, graph3.traversal().V().values(PageRankVertexProgram.PAGE_RANK).count().next().intValue());
assertEquals(24, graph3.traversal().V().values().count().next().intValue()); // no halted traversers
// TODO: add a test the shows DAG behavior -- splitting another TraversalVertexProgram off of the PeerPressureVertexProgram job.
}
///////////////////////////////////
@Test
@LoadGraphWith(MODERN)
public void shouldSupportPreExistingComputeKeys() throws Exception {
final ComputerResult result = graphProvider.getGraphComputer(graph).program(new VertexProgramN()).submit().get();
result.graph().vertices().forEachRemaining(vertex -> {
if (vertex.label().equals("person")) {
if (vertex.value("name").equals("marko"))
assertEquals(32, vertex.<Integer>value("age").intValue());
else if (vertex.value("name").equals("peter"))
assertEquals(38, vertex.<Integer>value("age").intValue());
else if (vertex.value("name").equals("vadas"))
assertEquals(30, vertex.<Integer>value("age").intValue());
else if (vertex.value("name").equals("josh"))
assertEquals(35, vertex.<Integer>value("age").intValue());
else
throw new IllegalStateException("This vertex should not have been accessed: " + vertex);
}
});
}
private static class VertexProgramN extends StaticVertexProgram {
@Override
public void setup(final Memory memory) {
}
@Override
public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
if (vertex.label().equals("person"))
vertex.property(VertexProperty.Cardinality.single, "age", vertex.<Integer>value("age") + 1);
}
@Override
public boolean terminate(final Memory memory) {
return memory.getIteration() > 1;
}
@Override
public Set<MessageScope> getMessageScopes(final Memory memory) {
return Collections.emptySet();
}
@Override
public Set<VertexComputeKey> getVertexComputeKeys() {
return Collections.singleton(VertexComputeKey.of("age", false));
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.NEW;
}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.VERTEX_PROPERTIES;
}
}
///////////////////////////////////
@Test
@LoadGraphWith(MODERN)
public void shouldSupportTransientKeys() throws Exception {
final ComputerResult result = graphProvider.getGraphComputer(graph).program(new VertexProgramO()).mapReduce(new MapReduceK()).submit().get();
result.graph().vertices().forEachRemaining(vertex -> {
assertFalse(vertex.property("v1").isPresent());
assertFalse(vertex.property("v2").isPresent());
assertTrue(vertex.property("v3").isPresent());
assertEquals("shouldExist", vertex.value("v3"));
assertTrue(vertex.property("name").isPresent());
if (vertex.label().equals("software"))
assertTrue(vertex.property("lang").isPresent());
else
assertTrue(vertex.property("age").isPresent());
assertEquals(3, IteratorUtils.count(vertex.properties()));
assertEquals(0, IteratorUtils.count(vertex.properties("v1")));
assertEquals(0, IteratorUtils.count(vertex.properties("v2")));
assertEquals(1, IteratorUtils.count(vertex.properties("v3")));
assertEquals(1, IteratorUtils.count(vertex.properties("name")));
});
assertEquals(6l, result.graph().traversal().V().properties("name").count().next().longValue());
assertEquals(0l, result.graph().traversal().V().properties("v1").count().next().longValue());
assertEquals(0l, result.graph().traversal().V().properties("v2").count().next().longValue());
assertEquals(6l, result.graph().traversal().V().properties("v3").count().next().longValue());
assertEquals(6l, result.graph().traversal().V().<String>values("name").dedup().count().next().longValue());
assertEquals(1l, result.graph().traversal().V().<String>values("v3").dedup().count().next().longValue());
final Traversal<Vertex,String> t = result.graph().traversal().V().<String>values("v3").dedup();
assertEquals("shouldExist", t.next());
CloseableIterator.closeIterator(t);
///
assertFalse(result.memory().exists("m1"));
assertFalse(result.memory().exists("m2"));
assertTrue(result.memory().exists("m3"));
assertEquals(24l, result.memory().<Long>get("m3").longValue());
assertEquals(2, result.memory().keys().size()); // mapReduceK
}
private static class VertexProgramO extends StaticVertexProgram {
@Override
public void setup(final Memory memory) {
assertFalse(memory.exists("m1"));
assertFalse(memory.exists("m2"));
assertFalse(memory.exists("m3"));
}
@Override
public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
if (memory.isInitialIteration()) {
assertFalse(vertex.property("v1").isPresent());
assertFalse(vertex.property("v2").isPresent());
assertFalse(vertex.property("v3").isPresent());
vertex.property("v1", "shouldNotExist");
vertex.property("v2", "shouldNotExist");
vertex.property("v3", "shouldExist");
assertTrue(vertex.property("v1").isPresent());
assertTrue(vertex.property("v2").isPresent());
assertTrue(vertex.property("v3").isPresent());
assertEquals("shouldNotExist", vertex.value("v1"));
assertEquals("shouldNotExist", vertex.value("v2"));
assertEquals("shouldExist", vertex.value("v3"));
//
assertFalse(memory.exists("m1"));
assertFalse(memory.exists("m2"));
assertFalse(memory.exists("m3"));
memory.add("m1", false);
memory.add("m2", true);
memory.add("m3", 2l);
// should still not exist as this pulls from the master memory
assertFalse(memory.exists("m1"));
assertFalse(memory.exists("m2"));
assertFalse(memory.exists("m3"));
} else {
assertTrue(vertex.property("v1").isPresent());
assertTrue(vertex.property("v2").isPresent());
assertTrue(vertex.property("v3").isPresent());
assertEquals("shouldNotExist", vertex.value("v1"));
assertEquals("shouldNotExist", vertex.value("v2"));
assertEquals("shouldExist", vertex.value("v3"));
//
assertTrue(memory.exists("m1"));
assertTrue(memory.exists("m2"));
assertTrue(memory.exists("m3"));
assertFalse(memory.get("m1"));
assertTrue(memory.get("m2"));
assertEquals(12l, memory.<Long>get("m3").longValue());
memory.add("m1", true);
memory.add("m2", true);
memory.add("m3", 2l);
}
}
@Override
public boolean terminate(final Memory memory) {
assertTrue(memory.exists("m1"));
assertTrue(memory.exists("m2"));
assertTrue(memory.exists("m3"));
if (memory.isInitialIteration()) {
assertFalse(memory.get("m1"));
assertTrue(memory.get("m2"));
assertEquals(12l, memory.<Long>get("m3").longValue());
return false;
} else {
assertTrue(memory.get("m1"));
assertTrue(memory.get("m2"));
assertEquals(24l, memory.<Long>get("m3").longValue());
return true;
}
}
@Override
public Set<MessageScope> getMessageScopes(final Memory memory) {
return Collections.emptySet();
}
@Override
public Set<MemoryComputeKey> getMemoryComputeKeys() {
return new HashSet<>(Arrays.asList(
MemoryComputeKey.of("m1", Operator.or, true, true),
MemoryComputeKey.of("m2", Operator.and, true, true),
MemoryComputeKey.of("m3", Operator.sum, true, false)));
}
@Override
public Set<VertexComputeKey> getVertexComputeKeys() {
return new HashSet<>(Arrays.asList(
VertexComputeKey.of("v1", true),
VertexComputeKey.of("v2", true),
VertexComputeKey.of("v3", false)));
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.NEW;
}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.VERTEX_PROPERTIES;
}
}
public static class MapReduceK extends StaticMapReduce {
@Override
public boolean doStage(final Stage stage) {
return stage.equals(Stage.MAP);
}
@Override
public void map(final Vertex vertex, final MapEmitter emitter) {
assertFalse(vertex.property("v1").isPresent());
assertFalse(vertex.property("v2").isPresent());
assertTrue(vertex.property("v3").isPresent());
assertTrue(vertex.property("name").isPresent());
assertEquals(3, IteratorUtils.count(vertex.properties()));
assertEquals(3, IteratorUtils.count(vertex.values()));
}
@Override
public String getMemoryKey() {
return "mapReduceK";
}
@Override
public Object generateFinalResult(final Iterator keyValues) {
return "anObject";
}
}
///////////////////////////////////
@Test
@LoadGraphWith(MODERN)
public void shouldSupportBroadcastKeys() throws Exception {
final ComputerResult result = graphProvider.getGraphComputer(graph).program(new VertexProgramP()).submit().get();
assertTrue(result.memory().exists("m1"));
assertFalse(result.memory().exists("m2"));
assertFalse(result.memory().exists("m3"));
assertTrue(result.memory().exists("m4"));
assertTrue(result.memory().get("m1"));
assertEquals(-18, result.memory().<Integer>get("m4").intValue());
assertEquals(2, result.memory().keys().size());
}
private static class VertexProgramP extends StaticVertexProgram {
@Override
public void setup(final Memory memory) {
assertFalse(memory.exists("m1")); // or
assertFalse(memory.exists("m2")); // and
assertFalse(memory.exists("m3")); // long
assertFalse(memory.exists("m4")); // int
memory.set("m1", false);
memory.set("m2", true);
memory.set("m3", 0l);
memory.set("m4", 0);
}
@Override
public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
if (memory.isInitialIteration()) {
assertFalse(memory.exists("m1"));
assertTrue(memory.exists("m2"));
assertTrue(memory.get("m2"));
assertFalse(memory.exists("m3"));
assertTrue(memory.exists("m4"));
assertEquals(0, memory.<Integer>get("m4").intValue());
memory.add("m1", false);
memory.add("m2", true);
memory.add("m3", 1l);
memory.add("m4", -1);
} else {
assertFalse(memory.exists("m1")); // no broadcast
assertTrue(memory.exists("m2"));
assertFalse(memory.exists("m3")); // no broadcast
assertTrue(memory.exists("m4"));
try {
assertFalse(memory.get("m1"));
fail();
} catch (final Exception e) {
validateException(Memory.Exceptions.memoryDoesNotExist("m1"), e);
}
assertTrue(memory.get("m2"));
try {
assertEquals(6l, memory.<Long>get("m3").longValue());
fail();
} catch (final Exception e) {
validateException(Memory.Exceptions.memoryDoesNotExist("m3"), e);
}
assertEquals(-6l, memory.<Integer>get("m4").intValue());
///
memory.add("m1", true);
memory.add("m2", true);
memory.add("m3", 2l);
memory.add("m4", -2);
}
}
@Override
public boolean terminate(final Memory memory) {
assertTrue(memory.exists("m1"));
assertTrue(memory.exists("m2"));
assertTrue(memory.exists("m3"));
assertTrue(memory.exists("m4"));
if (memory.isInitialIteration()) {
assertFalse(memory.get("m1"));
assertTrue(memory.get("m2"));
assertEquals(6l, memory.<Long>get("m3").longValue());
assertEquals(-6, memory.<Integer>get("m4").intValue());
return false;
} else {
assertTrue(memory.get("m1"));
assertTrue(memory.get("m2"));
assertEquals(18l, memory.<Long>get("m3").longValue());
assertEquals(-18, memory.<Integer>get("m4").intValue());
return true;
}
}
@Override
public Set<MessageScope> getMessageScopes(final Memory memory) {
return Collections.emptySet();
}
@Override
public Set<MemoryComputeKey> getMemoryComputeKeys() {
return new HashSet<>(Arrays.asList(
MemoryComputeKey.of("m1", Operator.or, false, false),
MemoryComputeKey.of("m2", Operator.and, true, true),
MemoryComputeKey.of("m3", Operator.sum, false, true),
MemoryComputeKey.of("m4", Operator.sum, true, false)));
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.NEW;
}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.VERTEX_PROPERTIES;
}
}
///////////////////////////////////
@Test
@LoadGraphWith(MODERN)
public void shouldSucceedWithProperTraverserRequirements() throws Exception {
final VertexProgramQ vp = VertexProgramQ.build().property("pl").create();
final Map<String, List<Integer>> expected = new HashMap<>();
expected.put("vadas", Collections.singletonList(2));
expected.put("lop", Arrays.asList(2, 2, 2, 3));
expected.put("josh", Collections.singletonList(2));
expected.put("ripple", Arrays.asList(2, 3));
try {
g.V().repeat(__.out()).emit().program(vp).dedup()
.valueMap("name", "pl").forEachRemaining((Map<Object, Object> map) -> {
final String name = (String) ((List) map.get("name")).get(0);
final List<Integer> pathLengths = (List<Integer>) map.get("pl");
assertTrue(expected.containsKey(name));
final List<Integer> expectedPathLengths = expected.remove(name);
assertTrue(expectedPathLengths.containsAll(pathLengths));
assertTrue(pathLengths.containsAll(expectedPathLengths));
});
assertTrue(expected.isEmpty());
} catch (VerificationException ex) {
assumeNoException(ex);
}
}
@Test
@LoadGraphWith(MODERN)
public void shouldFailWithImproperTraverserRequirements() throws Exception {
final VertexProgramQ vp = VertexProgramQ.build().property("pl").useTraverserRequirements(false).create();
try {
g.V().repeat(__.out()).emit().program(vp).dedup()
.forEachRemaining((Vertex v) -> assertFalse(v.property("pl").isPresent()));
} catch (VerificationException ex) {
assumeNoException(ex);
}
}
private static class VertexProgramQ implements VertexProgram<Object> {
private static final String VERTEX_PROGRAM_Q_CFG_PREFIX = "gremlin.vertexProgramQ";
private static final String PROPERTY_CFG_KEY = VERTEX_PROGRAM_Q_CFG_PREFIX + ".property";
private static final String LENGTHS_KEY = VERTEX_PROGRAM_Q_CFG_PREFIX + ".lengths";
private static final String USE_TRAVERSER_REQUIREMENTS_CFG_KEY = VERTEX_PROGRAM_Q_CFG_PREFIX + ".useTraverserRequirements";
private final static Set<MemoryComputeKey> MEMORY_COMPUTE_KEYS = Collections.singleton(
MemoryComputeKey.of(LENGTHS_KEY, Operator.addAll, true, true)
);
private final Set<VertexComputeKey> elementComputeKeys;
private Configuration configuration;
private String propertyKey;
private Set<TraverserRequirement> traverserRequirements;
private VertexProgramQ() {
elementComputeKeys = new HashSet<>();
}
public static Builder build() {
return new Builder();
}
static class Builder extends AbstractVertexProgramBuilder<Builder> {
private Builder() {
super(VertexProgramQ.class);
}
@SuppressWarnings("unchecked")
@Override
public VertexProgramQ create(final Graph graph) {
if (graph != null) {
ConfigurationUtils.append(graph.configuration().subset(VERTEX_PROGRAM_Q_CFG_PREFIX), configuration);
}
return (VertexProgramQ) VertexProgram.createVertexProgram(graph, configuration);
}
public Builder property(final String name) {
configuration.setProperty(PROPERTY_CFG_KEY, name);
return this;
}
/**
* This is only configurable for the purpose of testing. In a real-world VP this would be a bad pattern.
*/
public Builder useTraverserRequirements(final boolean value) {
configuration.setProperty(USE_TRAVERSER_REQUIREMENTS_CFG_KEY, value);
return this;
}
}
@Override
public void storeState(final Configuration config) {
VertexProgram.super.storeState(config);
if (configuration != null) {
ConfigurationUtils.copy(configuration, config);
}
}
@Override
public void loadState(final Graph graph, final Configuration config) {
configuration = new BaseConfiguration();
if (config != null) {
ConfigurationUtils.copy(config, configuration);
}
propertyKey = configuration.getString(PROPERTY_CFG_KEY);
traverserRequirements = configuration.getBoolean(USE_TRAVERSER_REQUIREMENTS_CFG_KEY, true)
? Collections.singleton(TraverserRequirement.PATH) : Collections.emptySet();
elementComputeKeys.add(VertexComputeKey.of(propertyKey, false));
}
@Override
public void setup(final Memory memory) {
}
@Override
public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
if (memory.isInitialIteration()) {
final Property<TraverserSet> haltedTraversers = vertex.property(TraversalVertexProgram.HALTED_TRAVERSERS);
if (!haltedTraversers.isPresent()) return;
final Iterator iterator = haltedTraversers.value().iterator();
if (iterator.hasNext()) {
while (iterator.hasNext()) {
final Traverser t = (Traverser) iterator.next();
if (!(t.path() instanceof EmptyPath)) {
final int pathLength = t.path().size();
final List<Pair<Vertex, Integer>> memoryValue = new LinkedList<>();
memoryValue.add(Pair.with(vertex, pathLength));
memory.add(LENGTHS_KEY, memoryValue);
}
}
}
} else {
if (memory.exists(LENGTHS_KEY)) {
final List<Pair<Vertex, Integer>> lengths = memory.get(LENGTHS_KEY);
for (final Pair<Vertex, Integer> pair : lengths) {
if (pair.getValue0().equals(vertex)) {
vertex.property(VertexProperty.Cardinality.list, propertyKey, pair.getValue1());
}
}
}
}
}
@Override
public boolean terminate(final Memory memory) {
return !memory.isInitialIteration();
}
@Override
public Set<MessageScope> getMessageScopes(final Memory memory) {
return Collections.emptySet();
}
@Override
public Set<VertexComputeKey> getVertexComputeKeys() {
return elementComputeKeys;
}
@Override
public Set<MemoryComputeKey> getMemoryComputeKeys() {
return MEMORY_COMPUTE_KEYS;
}
@SuppressWarnings({"CloneDoesntDeclareCloneNotSupportedException", "CloneDoesntCallSuperClone"})
@Override
public VertexProgram<Object> clone() {
return this;
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.NEW;
}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.VERTEX_PROPERTIES;
}
@Override
public Set<TraverserRequirement> getTraverserRequirements() {
return this.traverserRequirements;
}
@Override
public Features getFeatures() {
return new Features() {
@Override
public boolean requiresVertexPropertyAddition() {
return true;
}
};
}
}
///////////////////////////////////
@Test
@LoadGraphWith(SINK)
public void testMessagePassingIn() throws Exception {
runMPTest(Direction.IN).forEachRemaining(v -> {
vertexPropertyChecks(v);
final String in = v.value(VertexProgramR.PROPERTY_IN);
if (in.equals("a"))
assertEquals("ab", v.value(VertexProgramR.PROPERTY_OUT).toString());
else if (in.equals("b"))
assertEquals("", v.value(VertexProgramR.PROPERTY_OUT).toString());
else
throw new IllegalStateException("This vertex should not exist: " + VertexProgramR.PROPERTY_IN
+ "=" + String.valueOf(in));
});
}
@Test
@LoadGraphWith(SINK)
public void testMessagePassingOut() throws Exception {
runMPTest(Direction.OUT).forEachRemaining(v -> {
vertexPropertyChecks(v);
final String in = v.value(VertexProgramR.PROPERTY_IN);
if (in.equals("a"))
assertEquals("a", v.value(VertexProgramR.PROPERTY_OUT).toString());
else if (in.equals("b"))
assertEquals("a", v.value(VertexProgramR.PROPERTY_OUT).toString());
else
throw new IllegalStateException("This vertex should not exist: " + VertexProgramR.PROPERTY_IN
+ "=" + String.valueOf(in));
});
}
@Test
@LoadGraphWith(SINK)
public void testMessagePassingBoth() throws Exception {
runMPTest(Direction.BOTH).forEachRemaining(v -> {
vertexPropertyChecks(v);
final String in = v.value(VertexProgramR.PROPERTY_IN);
switch (in) {
case "a":
assertEquals("aab", v.value(VertexProgramR.PROPERTY_OUT).toString());
break;
case "b":
assertEquals("a", v.value(VertexProgramR.PROPERTY_OUT).toString());
break;
default:
throw new IllegalStateException("This vertex should not exist: " + VertexProgramR.PROPERTY_IN
+ "=" + String.valueOf(in));
}
});
}
private GraphTraversal<Vertex, Vertex> runMPTest(final Direction direction) throws Exception {
final VertexProgramR svp = VertexProgramR.build().direction(direction).create();
final ComputerResult result = graphProvider.getGraphComputer(graph).program(svp).vertices(__.hasLabel(VertexProgramR.VERTEX_LABEL)).submit().get();
return result.graph().traversal().V().hasLabel(VertexProgramR.VERTEX_LABEL);
}
private static void vertexPropertyChecks(final Vertex v) {
assertEquals(2, v.keys().size());
assertTrue(v.keys().contains(VertexProgramR.PROPERTY_IN));
assertTrue(v.keys().contains(VertexProgramR.PROPERTY_OUT));
assertEquals(1, IteratorUtils.count(v.values(VertexProgramR.PROPERTY_IN)));
assertEquals(1, IteratorUtils.count(v.values(VertexProgramR.PROPERTY_OUT)));
}
private static class VertexProgramR implements VertexProgram<String> {
private static final String SIMPLE_VERTEX_PROGRAM_CFG_PREFIX = "gremlin.simpleVertexProgram";
private static final String PROPERTY_OUT = "propertyout";
private static final String PROPERTY_IN = "name";
private static final String VERTEX_LABEL = "message";
private static final String DIRECTION_CFG_KEY = SIMPLE_VERTEX_PROGRAM_CFG_PREFIX + ".direction";
private Direction direction;
private final MessageScope.Local<String> inMessageScope = MessageScope.Local.of(__::inE);
private final MessageScope.Local<String> outMessageScope = MessageScope.Local.of(__::outE);
private final MessageScope.Local<String> bothMessageScope = MessageScope.Local.of(__::bothE);
private MessageScope.Local<String> messageScope;
private final Set<VertexComputeKey> vertexComputeKeys = new HashSet<>(Arrays.asList(
VertexComputeKey.of(VertexProgramR.PROPERTY_OUT, false),
VertexComputeKey.of(VertexProgramR.PROPERTY_IN, false)));
/**
* Clones this vertex program.
*
* @return a clone of this vertex program
*/
public VertexProgramR clone() { return this; }
@Override
public void loadState(final Graph graph, final Configuration configuration) {
direction = Direction.valueOf(configuration.getString(DIRECTION_CFG_KEY));
switch (direction) {
case IN:
this.messageScope = this.inMessageScope;
break;
case OUT:
this.messageScope = this.outMessageScope;
break;
case BOTH:
this.messageScope = this.bothMessageScope;
break;
default:
throw new IllegalStateException("Should not reach this point!");
}
}
@Override
public void storeState(final Configuration configuration) {
VertexProgram.super.storeState(configuration);
configuration.setProperty(DIRECTION_CFG_KEY, direction.name());
}
@Override
public void setup(final Memory memory) {
}
@Override
public void execute(final Vertex vertex, final Messenger<String> messenger, final Memory memory) {
if (memory.isInitialIteration()) {
messenger.sendMessage(this.messageScope, vertex.value(PROPERTY_IN).toString());
} else {
final char[] composite = IteratorUtils.reduce(messenger.receiveMessages(), "", (a, b) -> a + b).toCharArray();
Arrays.sort(composite);
vertex.property(PROPERTY_OUT, new String(composite));
}
}
@Override
public boolean terminate(final Memory memory) {
return !memory.isInitialIteration();
}
@Override
public Set<MessageScope> getMessageScopes(final Memory memory) {
return Collections.singleton(this.messageScope);
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.NEW;
}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.VERTEX_PROPERTIES;
}
@Override
public Set<VertexComputeKey> getVertexComputeKeys() {
return this.vertexComputeKeys;
}
@Override
public Set<MemoryComputeKey> getMemoryComputeKeys() {
return Collections.emptySet();
}
public static Builder build() {
return new Builder();
}
static class Builder extends AbstractVertexProgramBuilder<Builder> {
private Builder() {
super(VertexProgramR.class);
}
@SuppressWarnings("unchecked")
@Override
public VertexProgramR create(final Graph graph) {
if (graph != null) {
ConfigurationUtils.append(graph.configuration().subset(SIMPLE_VERTEX_PROGRAM_CFG_PREFIX), configuration);
}
return (VertexProgramR) VertexProgram.createVertexProgram(graph, configuration);
}
public Builder direction(final Direction direction) {
configuration.setProperty(DIRECTION_CFG_KEY, direction.toString());
return this;
}
}
}
}