blob: 1187a1aecd576e758d03d4029c71352dff8ccea4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.process.traversal.step.map;
import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
import org.apache.tinkerpop.gremlin.process.traversal.lambda.ElementValueTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.lambda.FunctionTraverser;
import org.apache.tinkerpop.gremlin.process.traversal.lambda.IdentityTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.lambda.TokenTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating;
import org.apache.tinkerpop.gremlin.process.traversal.step.LambdaHolder;
import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.javatuples.Pair;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BinaryOperator;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>> implements ByModulating, TraversalParent {
private char state = 'k';
private Traversal.Admin<S, K> keyTraversal = null;
private Traversal.Admin<S, ?> preTraversal;
private Traversal.Admin<S, V> valueTraversal;
public GroupStep(final Traversal.Admin traversal) {
super(traversal);
this.valueTraversal = this.integrateChild(__.fold().asAdmin());
this.preTraversal = this.integrateChild(generatePreTraversal(this.valueTraversal));
this.setReducingBiOperator(new GroupBiOperator<>(this.valueTraversal));
this.setSeedSupplier(HashMapSupplier.instance());
}
@Override
public void modulateBy(final Traversal.Admin<?, ?> kvTraversal) {
if ('k' == this.state) {
this.keyTraversal = this.integrateChild(kvTraversal);
this.state = 'v';
} else if ('v' == this.state) {
this.valueTraversal = this.integrateChild(convertValueTraversal(kvTraversal));
this.preTraversal = this.integrateChild(generatePreTraversal(this.valueTraversal));
this.setReducingBiOperator(new GroupBiOperator<>(this.valueTraversal));
this.state = 'x';
} else {
throw new IllegalStateException("The key and value traversals for group()-step have already been set: " + this);
}
}
@Override
public Map<K, V> projectTraverser(final Traverser.Admin<S> traverser) {
final Map<K, V> map = new HashMap<>(1);
if (null == this.preTraversal) {
map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverser);
} else {
final TraverserSet traverserSet = new TraverserSet<>();
this.preTraversal.reset();
this.preTraversal.addStart(traverser);
this.preTraversal.getEndStep().forEachRemaining(traverserSet::add);
map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverserSet);
}
return map;
}
@Override
public String toString() {
return StringFactory.stepString(this, this.keyTraversal, this.valueTraversal);
}
@Override
public List<Traversal.Admin<?, ?>> getLocalChildren() {
final List<Traversal.Admin<?, ?>> children = new ArrayList<>(2);
if (null != this.keyTraversal)
children.add((Traversal.Admin) this.keyTraversal);
children.add(this.valueTraversal);
return children;
}
@Override
public Set<TraverserRequirement> getRequirements() {
return this.getSelfAndChildRequirements(TraverserRequirement.OBJECT, TraverserRequirement.BULK);
}
@Override
public GroupStep<S, K, V> clone() {
final GroupStep<S, K, V> clone = (GroupStep<S, K, V>) super.clone();
if (null != this.keyTraversal)
clone.keyTraversal = this.keyTraversal.clone();
clone.valueTraversal = this.valueTraversal.clone();
clone.preTraversal = this.integrateChild(GroupStep.generatePreTraversal(clone.valueTraversal));
return clone;
}
@Override
public void setTraversal(final Traversal.Admin<?, ?> parentTraversal) {
super.setTraversal(parentTraversal);
integrateChild(this.keyTraversal);
integrateChild(this.valueTraversal);
integrateChild(this.preTraversal);
}
@Override
public int hashCode() {
int result = super.hashCode();
if (this.keyTraversal != null) result ^= this.keyTraversal.hashCode();
result ^= this.valueTraversal.hashCode();
return result;
}
@Override
public Map<K, V> generateFinalResult(final Map<K, V> object) {
return GroupStep.doFinalReduction((Map<K, Object>) object, this.valueTraversal);
}
///////////////////////
public static final class GroupBiOperator<K, V> implements BinaryOperator<Map<K, V>>, Serializable {
// size limit before Barrier.processAllStarts() to lazy reduce
private static final int SIZE_LIMIT = 1000;
private Traversal.Admin<?, V> valueTraversal;
private Barrier barrierStep;
public GroupBiOperator(final Traversal.Admin<?, V> valueTraversal) {
// if there is a lambda that can not be serialized, then simply use TraverserSets
if (TraversalHelper.hasStepOfAssignableClassRecursively(LambdaHolder.class, valueTraversal)) {
this.valueTraversal = null;
this.barrierStep = null;
} else {
this.valueTraversal = valueTraversal;
this.barrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, this.valueTraversal).orElse(null);
}
}
public GroupBiOperator() {
// no-arg constructor for serialization
}
@Override
public Map<K, V> apply(final Map<K, V> mapA, final Map<K, V> mapB) {
for (final K key : mapB.keySet()) {
Object objectA = mapA.get(key);
final Object objectB = mapB.get(key);
assert null != objectB;
if (null == objectA) {
objectA = objectB;
} else {
// TRAVERSER
if (objectA instanceof Traverser.Admin) {
if (objectB instanceof Traverser.Admin) {
final TraverserSet set = new TraverserSet();
set.add((Traverser.Admin) objectA);
set.add((Traverser.Admin) objectB);
objectA = set;
} else if (objectB instanceof TraverserSet) {
final TraverserSet set = (TraverserSet) objectB;
set.add((Traverser.Admin) objectA);
if (null != this.barrierStep && set.size() > SIZE_LIMIT) {
this.valueTraversal.reset();
((Step) this.barrierStep).addStarts(set.iterator());
objectA = this.barrierStep.nextBarrier();
} else
objectA = objectB;
} else if (objectB instanceof Pair) {
final TraverserSet set = (TraverserSet) ((Pair) objectB).getValue0();
set.add((Traverser.Admin) objectA);
if (set.size() > SIZE_LIMIT) { // barrier step can never be null -- no need to check
this.valueTraversal.reset();
((Step) this.barrierStep).addStarts(set.iterator());
this.barrierStep.addBarrier(((Pair) objectB).getValue1());
objectA = this.barrierStep.nextBarrier();
} else
objectA = Pair.with(set, ((Pair) objectB).getValue1());
} else
objectA = Pair.with(new TraverserSet((Traverser.Admin) objectA), objectB);
// TRAVERSER SET
} else if (objectA instanceof TraverserSet) {
if (objectB instanceof Traverser.Admin) {
final TraverserSet set = (TraverserSet) objectA;
set.add((Traverser.Admin) objectB);
if (null != this.barrierStep && set.size() > SIZE_LIMIT) {
this.valueTraversal.reset();
((Step) this.barrierStep).addStarts(set.iterator());
objectA = this.barrierStep.nextBarrier();
}
} else if (objectB instanceof TraverserSet) {
final TraverserSet set = (TraverserSet) objectA;
set.addAll((TraverserSet) objectB);
if (null != this.barrierStep && set.size() > SIZE_LIMIT) {
this.valueTraversal.reset();
((Step) this.barrierStep).addStarts(set.iterator());
objectA = this.barrierStep.nextBarrier();
}
} else if (objectB instanceof Pair) {
final TraverserSet set = (TraverserSet) objectA;
set.addAll((TraverserSet) ((Pair) objectB).getValue0());
if (set.size() > SIZE_LIMIT) { // barrier step can never be null -- no need to check
this.valueTraversal.reset();
((Step) this.barrierStep).addStarts(set.iterator());
this.barrierStep.addBarrier(((Pair) objectB).getValue1());
objectA = this.barrierStep.nextBarrier();
} else
objectA = Pair.with(set, ((Pair) objectB).getValue1());
} else
objectA = Pair.with(objectA, objectB);
// TRAVERSER SET + BARRIER
} else if (objectA instanceof Pair) {
if (objectB instanceof Traverser.Admin) {
final TraverserSet set = ((TraverserSet) ((Pair) objectA).getValue0());
set.add((Traverser.Admin) objectB);
if (set.size() > SIZE_LIMIT) { // barrier step can never be null -- no need to check
this.valueTraversal.reset();
((Step) this.barrierStep).addStarts(set.iterator());
this.barrierStep.addBarrier(((Pair) objectA).getValue1());
objectA = this.barrierStep.nextBarrier();
}
} else if (objectB instanceof TraverserSet) {
final TraverserSet set = (TraverserSet) ((Pair) objectA).getValue0();
set.addAll((TraverserSet) objectB);
if (set.size() > SIZE_LIMIT) { // barrier step can never be null -- no need to check
this.valueTraversal.reset();
((Step) this.barrierStep).addStarts(set.iterator());
this.barrierStep.addBarrier(((Pair) objectA).getValue1());
objectA = this.barrierStep.nextBarrier();
}
} else if (objectB instanceof Pair) {
this.valueTraversal.reset();
this.barrierStep.addBarrier(((Pair) objectA).getValue1());
this.barrierStep.addBarrier(((Pair) objectB).getValue1());
((Step) this.barrierStep).addStarts(((TraverserSet) ((Pair) objectA).getValue0()).iterator());
((Step) this.barrierStep).addStarts(((TraverserSet) ((Pair) objectB).getValue0()).iterator());
objectA = this.barrierStep.nextBarrier();
} else {
this.valueTraversal.reset();
this.barrierStep.addBarrier(((Pair) objectA).getValue1());
this.barrierStep.addBarrier(objectB);
((Step) this.barrierStep).addStarts(((TraverserSet) ((Pair) objectA).getValue0()).iterator());
objectA = this.barrierStep.nextBarrier();
}
// BARRIER
} else {
if (objectB instanceof Traverser.Admin) {
objectA = Pair.with(new TraverserSet<>((Traverser.Admin) objectB), objectA);
} else if (objectB instanceof TraverserSet) {
objectA = Pair.with(objectB, objectA);
} else if (objectB instanceof Pair) {
this.valueTraversal.reset();
this.barrierStep.addBarrier(objectA);
this.barrierStep.addBarrier(((Pair) objectB).getValue1());
((Step) this.barrierStep).addStarts(((TraverserSet) ((Pair) objectB).getValue0()).iterator());
objectA = this.barrierStep.nextBarrier();
} else {
this.valueTraversal.reset();
this.barrierStep.addBarrier(objectA);
this.barrierStep.addBarrier(objectB);
objectA = this.barrierStep.nextBarrier();
}
}
}
mapA.put(key, (V) objectA);
}
return mapA;
}
// necessary to control Java Serialization to ensure proper clearing of internal traverser data
private void writeObject(final ObjectOutputStream outputStream) throws IOException {
// necessary as a non-root child is being sent over the wire
if (null != this.valueTraversal) this.valueTraversal.setParent(EmptyStep.instance());
outputStream.writeObject(null == this.valueTraversal ? null : this.valueTraversal.clone()); // todo: reset() instead?
}
private void readObject(final ObjectInputStream inputStream) throws IOException, ClassNotFoundException {
this.valueTraversal = (Traversal.Admin<?, V>) inputStream.readObject();
this.barrierStep = null == this.valueTraversal ? null : TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, this.valueTraversal).orElse(null);
}
}
///////////////////////
public static <S, E> Traversal.Admin<S, E> convertValueTraversal(final Traversal.Admin<S, E> valueTraversal) {
if (valueTraversal instanceof ElementValueTraversal ||
valueTraversal instanceof TokenTraversal ||
valueTraversal instanceof IdentityTraversal ||
valueTraversal.getStartStep() instanceof LambdaMapStep && ((LambdaMapStep) valueTraversal.getStartStep()).getMapFunction() instanceof FunctionTraverser) {
return (Traversal.Admin<S, E>) __.map(valueTraversal).fold();
} else {
return valueTraversal;
}
}
public static Traversal.Admin<?, ?> generatePreTraversal(final Traversal.Admin<?, ?> valueTraversal) {
if (!TraversalHelper.hasStepOfAssignableClass(Barrier.class, valueTraversal))
return valueTraversal;
final Traversal.Admin<?, ?> first = __.identity().asAdmin();
for (final Step step : valueTraversal.getSteps()) {
if (step instanceof Barrier)
break;
first.addStep(step.clone());
}
return first.getSteps().size() == 1 ? null : first;
}
public static <K, V> Map<K, V> doFinalReduction(final Map<K, Object> map, final Traversal.Admin<?, V> valueTraversal) {
final Map<K, V> reducedMap = new HashMap<>(map.size());
final Barrier reducingBarrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, valueTraversal).orElse(null);
IteratorUtils.removeOnNext(map.entrySet().iterator()).forEachRemaining(entry -> {
valueTraversal.reset();
if (null == reducingBarrierStep) {
reducedMap.put(entry.getKey(), entry.getValue() instanceof TraverserSet ?
((TraverserSet<V>) entry.getValue()).iterator().next().get() :
(V) entry.getValue());
} else {
if (entry.getValue() instanceof Traverser.Admin)
((Step) reducingBarrierStep).addStart((Traverser.Admin) entry.getValue());
else if (entry.getValue() instanceof TraverserSet)
((Step) reducingBarrierStep).addStarts(((TraverserSet) entry.getValue()).iterator());
else if (entry.getValue() instanceof Pair) {
((Step) reducingBarrierStep).addStarts(((TraverserSet) (((Pair) entry.getValue()).getValue0())).iterator());
reducingBarrierStep.addBarrier((((Pair) entry.getValue()).getValue1()));
} else
reducingBarrierStep.addBarrier(entry.getValue());
reducedMap.put(entry.getKey(), valueTraversal.next());
}
});
assert map.isEmpty();
map.clear();
map.putAll(reducedMap);
return (Map<K, V>) map;
}
}