Merge branch 'TINKERPOP-2919' into 3.5-dev
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 0926bbb..27d861f 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -32,6 +32,7 @@
 * Changed `with()` configuration for `ARGS_BATCH_SIZE` and `ARGS_EVAL_TIMEOUT` to be more forgiving on the type of `Number` used for the value.
 * Changed `gremlin-console` to add imports via an ImportCustomizer to reduce time spent resolving imports.
 * Bumped to Groovy 2.5.21.
+* Refactored `FilterRankingStrategy` to improve performance for deeply nested traversals.
 * Added `AuthInfoProvider` interface and `NewDynamicAuth()` to gremlin-go for dynamic authentication support.
 * Bumped to `snakeyaml` 2.0 to fix security vulnerability.
 * Bumped to Apache `commons-configuration` 2.9.0 to fix security vulnerability.
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/optimization/FilterRankingStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/optimization/FilterRankingStrategy.java
index 8bb7d68..65723a2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/optimization/FilterRankingStrategy.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/optimization/FilterRankingStrategy.java
@@ -38,10 +38,18 @@
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.OrderGlobalStep;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.javatuples.Pair;
 
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * {@code FilterRankingStrategy} reorders filter- and order-steps according to their rank. Step ranks are defined within
@@ -61,33 +69,63 @@
     private static final FilterRankingStrategy INSTANCE = new FilterRankingStrategy();
     private static final Set<Class<? extends OptimizationStrategy>> PRIORS = Collections.singleton(IdentityRemovalStrategy.class);
 
-    private FilterRankingStrategy() {
-    }
+    private FilterRankingStrategy() { }
 
     @Override
     public void apply(final Traversal.Admin<?, ?> traversal) {
-        boolean modified = true;
-        while (modified) {
-            modified = false;
-            final List<Step> steps = traversal.getSteps();
-            for (int i = 0; i < steps.size() - 1; i++) {
-                final Step<?, ?> step = steps.get(i);
-                final Step<?, ?> nextStep = step.getNextStep();
-                if (!usesLabels(nextStep, step.getLabels())) {
-                    final int nextRank = getStepRank(nextStep);
-                    if (nextRank != 0) {
-                        if (!step.getLabels().isEmpty()) {
-                            TraversalHelper.copyLabels(step, nextStep, true);
-                            modified = true;
-                        }
-                        if (getStepRank(step) > nextRank) {
-                            traversal.removeStep(nextStep);
-                            traversal.addStep(i, nextStep);
-                            modified = true;
+        // this strategy is only applied to the root (recursively) because it uses a tiny cache which will drastically
+        // speed up the ranking function in the event of encountering traversals with significant depth and branching.
+        // if we let normal strategy application apply then the cache will reset since this strategy and the traversal
+        // does not hold any state about that cache. tried using the marker pattern used in other strategies but that
+        // didn't work so well.
+        if (traversal.isRoot()) {
+            // TraversalParent steps require a costly function to calculate if labels are in use in their child
+            // traversals. This little cache keeps the effective data of that function which is if there is a
+            // lambda in the children and the set of scope keys. note that the lambda sorta trumps the labels in
+            // that if there is a lambda there's no real point to doing any sort of eval of the labels.
+            final Map<TraversalParent, Pair<Boolean, Set<String>>> traversalParentCache = new HashMap<>();
+            final Map<Step, Integer> stepRanking = new HashMap<>();
+
+            // build up the little cache
+            final Map<TraversalParent, List<Step<?,?>>> m =
+                    TraversalHelper.getStepsOfAssignableClassRecursively(traversal, Scoping.class, LambdaHolder.class).stream().
+                    collect(Collectors.groupingBy(step -> ((Step) step).getTraversal().getParent()));
+            m.forEach((k, v) -> {
+                final boolean hasLambda = v.stream().anyMatch(s -> s instanceof LambdaHolder);
+                if (hasLambda) {
+                    traversalParentCache.put(k, Pair.with(true, Collections.emptySet()));
+                } else {
+                    traversalParentCache.put(k, Pair.with(false, v.stream().filter(s -> s instanceof Scoping).
+                            flatMap(s -> ((Scoping) s).getScopeKeys().stream()).collect(Collectors.toSet())));
+                }
+            });
+
+            TraversalHelper.applyTraversalRecursively(t -> {
+                boolean modified = true;
+                while (modified) {
+                    modified = false;
+                    final List<Step> steps = t.getSteps();
+                    for (int i = 0; i < steps.size() - 1; i++) {
+                        final Step<?, ?> step = steps.get(i);
+                        final Set<String> labels = step.getLabels();
+                        final Step<?, ?> nextStep = step.getNextStep();
+                        if (!usesLabels(nextStep, labels, traversalParentCache)) {
+                            final int nextRank = stepRanking.computeIfAbsent(nextStep, FilterRankingStrategy::getStepRank);
+                            if (nextRank != 0) {
+                                if (!step.getLabels().isEmpty()) {
+                                    TraversalHelper.copyLabels(step, nextStep, true);
+                                    modified = true;
+                                }
+                                if (stepRanking.computeIfAbsent(step, FilterRankingStrategy::getStepRank) > nextRank) {
+                                    t.removeStep(nextStep);
+                                    t.addStep(i, nextStep);
+                                    modified = true;
+                                }
+                            }
                         }
                     }
                 }
-            }
+            }, traversal);
         }
     }
 
@@ -144,19 +182,29 @@
         return maxStepRank;
     }
 
-    private static boolean usesLabels(final Step<?, ?> step, final Set<String> labels) {
+    private static boolean usesLabels(final Step<?, ?> step, final Set<String> labels,
+                                      final Map<TraversalParent, Pair<Boolean, Set<String>>> traversalParentCache) {
         if (step instanceof LambdaHolder)
             return true;
-        if (step instanceof Scoping) {
+        if (step instanceof Scoping && !labels.isEmpty()) {
             final Set<String> scopes = ((Scoping) step).getScopeKeys();
             for (final String label : labels) {
                 if (scopes.contains(label))
                     return true;
             }
         }
+
         if (step instanceof TraversalParent) {
-            if (TraversalHelper.anyStepRecursively(s -> usesLabels(s, labels), (TraversalParent) step))
+            // when the step is a parent and is not in the cache it means it's not gonna be using labels
+            if (!traversalParentCache.containsKey(step)) return false;
+
+            // if we do have a pair then check the boolean first, as it instantly means labels are in use
+            // (or i guess can't be detected because it's a lambda)
+            final Pair<Boolean, Set<String>> p = traversalParentCache.get(step);
+            if (p.getValue0())
                 return true;
+            else
+                return p.getValue1().stream().anyMatch(labels::contains);
         }
         return false;
     }
@@ -169,4 +217,4 @@
     public static FilterRankingStrategy instance() {
         return INSTANCE;
     }
-}
+}
\ No newline at end of file
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelper.java
index 6500c4e..2099ff8 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelper.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelper.java
@@ -290,6 +290,29 @@
         return list;
     }
 
+    /**
+     * Get steps of the specified classes throughout the traversal.
+     */
+    public static List<Step<?,?>> getStepsOfAssignableClassRecursively(final Traversal.Admin<?, ?> traversal, final Class<?>... stepClasses) {
+        final List<Step<?,?>> list = new ArrayList<>();
+        for (final Step<?, ?> step : traversal.getSteps()) {
+            for (Class<?> stepClass : stepClasses) {
+                if (stepClass.isAssignableFrom(step.getClass()))
+                    list.add(step);
+            }
+
+            if (step instanceof TraversalParent) {
+                for (final Traversal.Admin<?, ?> localChild : ((TraversalParent) step).getLocalChildren()) {
+                    list.addAll(TraversalHelper.getStepsOfAssignableClassRecursively(localChild, stepClasses));
+                }
+                for (final Traversal.Admin<?, ?> globalChild : ((TraversalParent) step).getGlobalChildren()) {
+                    list.addAll(TraversalHelper.getStepsOfAssignableClassRecursively(globalChild, stepClasses));
+                }
+            }
+        }
+        return list;
+    }
+
     public static boolean isGlobalChild(Traversal.Admin<?, ?> traversal) {
         while (!(traversal.isRoot())) {
             if (traversal.getParent().getLocalChildren().contains(traversal))
diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelperTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelperTest.java
index 66ef474..5e277ad 100644
--- a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelperTest.java
+++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/util/TraversalHelperTest.java
@@ -34,6 +34,7 @@
 import org.apache.tinkerpop.gremlin.process.traversal.step.filter.WhereTraversalStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.filter.NotStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.FlatMapStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.FoldStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.PropertiesStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalFlatMapStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.TraversalMapStep;
@@ -419,6 +420,30 @@
     @Test
     public void shouldFindStepsRecursively() {
         final Traversal<?,?> traversal = __.V().repeat(__.out().simplePath());
-        assertTrue(TraversalHelper.anyStepRecursively(s -> s instanceof PathFilterStep, traversal.asAdmin()));
+        assertThat(TraversalHelper.anyStepRecursively(s -> s instanceof PathFilterStep, traversal.asAdmin()), is(true));
+    }
+
+    @Test
+    public void shouldGetStepsOfAssignableClassRecursivelyNoTypes() {
+        final Traversal.Admin<?,?> traversal = __.V().repeat(__.out()).project("x").by(out().in().fold()).asAdmin();
+        final List<Step<?,?>> steps = TraversalHelper.getStepsOfAssignableClassRecursively(traversal);
+        assertEquals(0, steps.size());
+    }
+
+    @Test
+    public void shouldGetStepsOfAssignableClassRecursivelyOneType() {
+        final Traversal.Admin<?,?> traversal = __.V().repeat(__.out()).project("x").by(out().in().fold()).asAdmin();
+        final List<Step<?,?>> steps = TraversalHelper.getStepsOfAssignableClassRecursively(traversal, VertexStep.class);
+        assertEquals(3, steps.size());
+        assertThat(steps.stream().allMatch(s -> s instanceof VertexStep), is(true));
+    }
+
+    @Test
+    public void shouldGetStepsOfAssignableClassRecursivelyMultipleTypes() {
+        final Traversal.Admin<?,?> traversal = __.V().repeat(__.out()).project("x").by(out().in().fold()).asAdmin();
+        final List<Step<?,?>> steps = TraversalHelper.getStepsOfAssignableClassRecursively(traversal, VertexStep.class, FoldStep.class);
+        assertEquals(4, steps.size());
+        assertEquals(3, steps.stream().filter(s -> s instanceof VertexStep).count());
+        assertEquals(1, steps.stream().filter(s -> s instanceof FoldStep).count());
     }
 }