blob: e42e5fa5cbb808f6fef86cc0be92a803eb9d4a81 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.traversal.VertexTraversalSideEffects;
import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
import org.apache.tinkerpop.gremlin.process.traversal.step.Profiling;
import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
import org.apache.tinkerpop.gremlin.process.traversal.util.DependantMutableMetrics;
import org.apache.tinkerpop.gremlin.process.traversal.util.MutableMetrics;
import org.apache.tinkerpop.gremlin.process.traversal.util.StandardTraversalMetrics;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMetrics;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
/**
* @author Bob Briody (http://bobbriody.com)
*/
public final class ProfileStep<S> extends AbstractStep<S, S> implements MapReducer<MapReduce.NullObject, StandardTraversalMetrics, MapReduce.NullObject, StandardTraversalMetrics, StandardTraversalMetrics> {
// Stored in the Traversal sideEffects but kept here as a reference for convenience.
private StandardTraversalMetrics traversalMetrics;
public ProfileStep(final Traversal.Admin traversal) {
super(traversal);
}
@Override
public MapReduce<MapReduce.NullObject, StandardTraversalMetrics, MapReduce.NullObject, StandardTraversalMetrics, StandardTraversalMetrics> getMapReduce() {
return ProfileMapReduce.instance();
}
@Override
public Traverser<S> next() {
Traverser<S> ret = null;
initializeIfNeeded();
traversalMetrics.start(this.getId());
try {
ret = super.next();
return ret;
} finally {
if (ret != null) {
traversalMetrics.finish(this.getId(), ret.asAdmin().bulk());
} else {
traversalMetrics.stop(this.getId());
}
}
}
@Override
public boolean hasNext() {
initializeIfNeeded();
traversalMetrics.start(this.getId());
boolean ret = super.hasNext();
traversalMetrics.stop(this.getId());
return ret;
}
@Override
protected Traverser<S> processNextStart() throws NoSuchElementException {
return this.starts.next();
}
private void initializeIfNeeded() {
if (traversalMetrics != null) {
return;
}
createTraversalMetricsSideEffectIfNecessary();
// How can traversalMetrics still be null? When running on computer it may need to be re-initialized from
// sideEffects after serialization.
if (traversalMetrics == null) {
// look up the TraversalMetrics in the root traversal's sideEffects
Traversal t = this.getTraversal();
while (!(t.asAdmin().getParent() instanceof EmptyStep)) {
t = t.asAdmin().getParent().asStep().getTraversal();
}
traversalMetrics = t.asAdmin().getSideEffects().<StandardTraversalMetrics>get(TraversalMetrics.METRICS_KEY).get();
}
}
private void createTraversalMetricsSideEffectIfNecessary() {
if (this.getTraversal().getSideEffects().get(TraversalMetrics.METRICS_KEY).isPresent()) {
// Already initialized
return;
}
if (!(this.getTraversal().getParent() instanceof EmptyStep)) {
// Initialization is handled at the top-level of the traversal only.
return;
}
// The following code is executed once per top-level (non-nested) Traversal for all Profile steps. (Technically,
// once per thread if using Computer.)
traversalMetrics = this.getTraversal().getSideEffects().getOrCreate(TraversalMetrics.METRICS_KEY, StandardTraversalMetrics::new);
prepTraversalForProfiling(this.getTraversal().asAdmin(), null);
}
// Walk the traversal steps and initialize the Metrics timers.
private void prepTraversalForProfiling(Traversal.Admin<?, ?> traversal, MutableMetrics parentMetrics) {
DependantMutableMetrics prevMetrics = null;
final List<Step> steps = traversal.getSteps();
for (int ii = 0; ii + 1 < steps.size(); ii = ii + 2) {
Step step = steps.get(ii);
ProfileStep profileStep = (ProfileStep) steps.get(ii + 1);
// Create metrics
MutableMetrics metrics;
// Computer metrics are "stand-alone" but Standard metrics handle double-counted upstream time.
if (traversal.getEngine().isComputer()) {
metrics = new MutableMetrics(step.getId(), step.toString());
} else {
metrics = new DependantMutableMetrics(step.getId(), step.toString(), prevMetrics);
prevMetrics = (DependantMutableMetrics) metrics;
}
if (step instanceof Profiling) {
((Profiling) step).setMetrics(metrics);
}
// Initialize counters (necessary because some steps might end up being 0)
metrics.incrementCount(TraversalMetrics.ELEMENT_COUNT_ID, 0);
metrics.incrementCount(TraversalMetrics.TRAVERSER_COUNT_ID, 0);
// Add metrics to parent, if necessary
if (parentMetrics != null) {
parentMetrics.addNested(metrics);
}
// The TraversalMetrics sideEffect is shared across all the steps.
profileStep.traversalMetrics = this.traversalMetrics;
// Add root metrics to traversalMetrics
this.traversalMetrics.addMetrics(metrics, step.getId(), ii / 2, parentMetrics == null, profileStep.getId());
// Handle nested traversal
if (step instanceof TraversalParent) {
for (Traversal.Admin<?, ?> t : ((TraversalParent) step).getLocalChildren()) {
prepTraversalForProfiling(t, metrics);
}
for (Traversal.Admin<?, ?> t : ((TraversalParent) step).getGlobalChildren()) {
prepTraversalForProfiling(t, metrics);
}
}
}
}
//////////////////
public static final class ProfileMapReduce extends StaticMapReduce<MapReduce.NullObject, StandardTraversalMetrics, MapReduce.NullObject, StandardTraversalMetrics, StandardTraversalMetrics> {
private static ProfileMapReduce INSTANCE = new ProfileMapReduce();
private ProfileMapReduce() {
}
@Override
public boolean doStage(final Stage stage) {
return true;
}
@Override
public String getMemoryKey() {
return TraversalMetrics.METRICS_KEY;
}
@Override
public void map(final Vertex vertex, final MapEmitter<NullObject, StandardTraversalMetrics> emitter) {
VertexTraversalSideEffects.of(vertex).<StandardTraversalMetrics>get(TraversalMetrics.METRICS_KEY).ifPresent(emitter::emit);
}
@Override
public void combine(final NullObject key, final Iterator<StandardTraversalMetrics> values, final ReduceEmitter<NullObject, StandardTraversalMetrics> emitter) {
reduce(key, values, emitter);
}
@Override
public void reduce(final NullObject key, final Iterator<StandardTraversalMetrics> values, final ReduceEmitter<NullObject, StandardTraversalMetrics> emitter) {
emitter.emit(StandardTraversalMetrics.merge(values));
}
@Override
public StandardTraversalMetrics generateFinalResult(final Iterator<KeyValue<NullObject, StandardTraversalMetrics>> keyValues) {
return StandardTraversalMetrics.merge(IteratorUtils.map(keyValues, KeyValue::getValue));
}
public static ProfileMapReduce instance() {
return INSTANCE;
}
}
}