blob: 7d7be32fcbb7d49c7f7c034b8c3671a69c465a2d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.traversal.optimize;
import java.util.Iterator;
import java.util.List;
import org.apache.hugegraph.backend.query.BatchConditionQuery;
import org.apache.hugegraph.backend.query.ConditionQuery;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.type.HugeType;
import org.apache.hugegraph.type.define.HugeKeys;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator;
import org.apache.hugegraph.iterator.BatchMapperIterator;
public class HugeVertexStepByBatch<E extends Element>
extends HugeVertexStep<E> {
private static final long serialVersionUID = -3609787815053052222L;
private BatchMapperIterator<Traverser.Admin<Vertex>, E> batchIterator;
private Traverser.Admin<Vertex> head;
private Iterator<E> iterator;
public HugeVertexStepByBatch(final VertexStep<E> originalVertexStep) {
super(originalVertexStep);
this.batchIterator = null;
this.head = null;
this.iterator = null;
}
@Override
protected Traverser.Admin<E> processNextStart() {
/* Override super.processNextStart() */
if (this.batchIterator == null) {
int batchSize = (int) Query.QUERY_BATCH;
this.batchIterator = new BatchMapperIterator<>(
batchSize, this.starts, this::flatMap);
}
if (this.batchIterator.hasNext()) {
assert this.head != null;
E item = this.batchIterator.next();
// TODO: find the parent node accurately instead the head
return this.head.split(item, this);
}
throw FastNoSuchElementException.instance();
}
@Override
public void reset() {
super.reset();
this.closeIterator();
this.batchIterator = null;
this.head = null;
}
@Override
public Iterator<?> lastTimeResults() {
/*
* NOTE: fetch page from this iterator, can only get page info of
* the lowest level, may lost info of upper levels.
*/
return this.iterator;
}
@Override
protected void closeIterator() {
CloseableIterator.closeIterator(this.batchIterator);
}
@SuppressWarnings("unchecked")
private Iterator<E> flatMap(List<Traverser.Admin<Vertex>> traversers) {
if (this.head == null && traversers.size() > 0) {
this.head = traversers.get(0);
}
boolean queryVertex = this.returnsVertex();
boolean queryEdge = this.returnsEdge();
assert queryVertex || queryEdge;
if (queryVertex) {
this.iterator = (Iterator<E>) this.vertices(traversers);
} else {
assert queryEdge;
this.iterator = (Iterator<E>) this.edges(traversers);
}
return this.iterator;
}
private Iterator<Vertex> vertices(
List<Traverser.Admin<Vertex>> traversers) {
assert traversers.size() > 0;
Iterator<Edge> edges = this.edges(traversers);
return this.queryAdjacentVertices(edges);
}
private Iterator<Edge> edges(List<Traverser.Admin<Vertex>> traversers) {
assert traversers.size() > 0;
BatchConditionQuery batchQuery = new BatchConditionQuery(
HugeType.EDGE, traversers.size());
for (Traverser.Admin<Vertex> traverser : traversers) {
ConditionQuery query = this.constructEdgesQuery(traverser);
/*
* Merge each query into batch query through IN condition
* NOTE: duplicated results may be removed by backend store
*/
batchQuery.mergeToIN(query, HugeKeys.OWNER_VERTEX);
}
this.injectQueryInfo(batchQuery);
return this.queryEdges(batchQuery);
}
}