| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.rya.mongodb.aggregation; |
| |
| import java.util.Arrays; |
| |
| import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration; |
| import org.bson.Document; |
| import org.openrdf.query.algebra.Distinct; |
| import org.openrdf.query.algebra.Extension; |
| import org.openrdf.query.algebra.Filter; |
| import org.openrdf.query.algebra.Join; |
| import org.openrdf.query.algebra.MultiProjection; |
| import org.openrdf.query.algebra.Projection; |
| import org.openrdf.query.algebra.Reduced; |
| import org.openrdf.query.algebra.StatementPattern; |
| import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; |
| |
| import com.google.common.base.Preconditions; |
| import com.mongodb.MongoClient; |
| import com.mongodb.client.MongoCollection; |
| import com.mongodb.client.MongoDatabase; |
| |
| /** |
| * Visitor that transforms a SPARQL query tree by replacing as much of the tree |
| * as possible with one or more {@code AggregationPipelineQueryNode}s. |
| * <p> |
| * Each {@link AggregationPipelineQueryNode} contains a MongoDB aggregation |
| * pipeline which is equivalent to the replaced portion of the original query. |
| * Evaluating this node executes the pipeline and converts the results into |
| * query solutions. If only part of the query was transformed, the remaining |
| * query logic (higher up in the query tree) can be applied to those |
| * intermediate solutions as normal. |
| * <p> |
| * In general, processes the tree in bottom-up order: A leaf node |
| * ({@link StatementPattern}) is replaced with a pipeline that matches the |
| * corresponding statements. Then, if the parent node's semantics are supported |
| * by the visitor, stages are appended to the pipeline and the subtree at the |
| * parent node is replaced with the extended pipeline. This continues up the |
| * tree until reaching a node that cannot be transformed, in which case that |
| * node's child is now a single {@code AggregationPipelineQueryNode} (a leaf |
| * node) instead of the previous subtree, or until the entire tree has been |
| * subsumed into a single pipeline node. |
| * <p> |
| * Nodes which are transformed into pipeline stages: |
| * <p><ul> |
| * <li>A {@code StatementPattern} node forms the beginning of each pipeline. |
| * <li>Single-argument operations {@link Projection}, {@link MultiProjection}, |
| * {@link Extension}, {@link Distinct}, and {@link Reduced} will be transformed |
| * into pipeline stages whenever the child {@link TupleExpr} represents a |
| * pipeline. |
| * <li>A {@link Filter} operation will be appended to the pipeline when its |
| * child {@code TupleExpr} represents a pipeline and the filter condition is a |
| * type of {@link ValueExpr} understood by {@code AggregationPipelineQueryNode}. |
| * <li>A {@link Join} operation will be appended to the pipeline when one child |
| * is a {@code StatementPattern} and the other is an |
| * {@code AggregationPipelineQueryNode}. |
| * </ul> |
| */ |
| public class SparqlToPipelineTransformVisitor extends QueryModelVisitorBase<Exception> { |
| private final MongoCollection<Document> inputCollection; |
| |
| /** |
| * Instantiate a visitor directly from a {@link MongoCollection}. |
| * @param inputCollection Stores triples. |
| */ |
| public SparqlToPipelineTransformVisitor(MongoCollection<Document> inputCollection) { |
| this.inputCollection = Preconditions.checkNotNull(inputCollection); |
| } |
| |
| /** |
| * Instantiate a visitor from a {@link MongoDBRdfConfiguration}. |
| * @param conf Contains database connection information. |
| */ |
| public SparqlToPipelineTransformVisitor(StatefulMongoDBRdfConfiguration conf) { |
| Preconditions.checkNotNull(conf); |
| MongoClient mongo = conf.getMongoClient(); |
| MongoDatabase db = mongo.getDatabase(conf.getMongoDBName()); |
| this.inputCollection = db.getCollection(conf.getTriplesCollectionName()); |
| } |
| |
| @Override |
| public void meet(StatementPattern sp) { |
| sp.replaceWith(new AggregationPipelineQueryNode(inputCollection, sp)); |
| } |
| |
| @Override |
| public void meet(Join join) throws Exception { |
| // If one branch is a single statement pattern, then try replacing the |
| // other with a pipeline. |
| AggregationPipelineQueryNode pipelineNode = null; |
| StatementPattern joinWithSP = null; |
| if (join.getRightArg() instanceof StatementPattern) { |
| join.getLeftArg().visit(this); |
| if (join.getLeftArg() instanceof AggregationPipelineQueryNode) { |
| pipelineNode = (AggregationPipelineQueryNode) join.getLeftArg(); |
| joinWithSP = (StatementPattern) join.getRightArg(); |
| } |
| } |
| else if (join.getLeftArg() instanceof StatementPattern) { |
| join.getRightArg().visit(this); |
| if (join.getRightArg() instanceof AggregationPipelineQueryNode) { |
| pipelineNode = (AggregationPipelineQueryNode) join.getRightArg(); |
| joinWithSP = (StatementPattern) join.getLeftArg(); |
| } |
| } |
| else { |
| // Otherwise, visit the children to try to replace smaller subtrees |
| join.visitChildren(this); |
| } |
| // If this is now a join between a pipeline node and a statement |
| // pattern, add the join step at the end of the pipeline, and replace |
| // this node with the extended pipeline node. |
| if (pipelineNode != null && joinWithSP != null && pipelineNode.joinWith(joinWithSP)) { |
| join.replaceWith(pipelineNode); |
| } |
| } |
| |
| @Override |
| public void meet(Projection projectionNode) throws Exception { |
| projectionNode.visitChildren(this); |
| if (projectionNode.getArg() instanceof AggregationPipelineQueryNode && projectionNode.getParentNode() != null) { |
| AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) projectionNode.getArg(); |
| if (pipelineNode.project(Arrays.asList(projectionNode.getProjectionElemList()))) { |
| projectionNode.replaceWith(pipelineNode); |
| } |
| } |
| } |
| |
| @Override |
| public void meet(MultiProjection projectionNode) throws Exception { |
| projectionNode.visitChildren(this); |
| if (projectionNode.getArg() instanceof AggregationPipelineQueryNode && projectionNode.getParentNode() != null) { |
| AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) projectionNode.getArg(); |
| if (pipelineNode.project(projectionNode.getProjections())) { |
| projectionNode.replaceWith(pipelineNode); |
| } |
| } |
| } |
| |
| @Override |
| public void meet(Extension extensionNode) throws Exception { |
| extensionNode.visitChildren(this); |
| if (extensionNode.getArg() instanceof AggregationPipelineQueryNode && extensionNode.getParentNode() != null) { |
| AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) extensionNode.getArg(); |
| if (pipelineNode.extend(extensionNode.getElements())) { |
| extensionNode.replaceWith(pipelineNode); |
| } |
| } |
| } |
| |
| @Override |
| public void meet(Reduced reducedNode) throws Exception { |
| reducedNode.visitChildren(this); |
| if (reducedNode.getArg() instanceof AggregationPipelineQueryNode && reducedNode.getParentNode() != null) { |
| reducedNode.replaceWith(reducedNode.getArg()); |
| } |
| } |
| |
| @Override |
| public void meet(Distinct distinctNode) throws Exception { |
| distinctNode.visitChildren(this); |
| if (distinctNode.getArg() instanceof AggregationPipelineQueryNode && distinctNode.getParentNode() != null) { |
| AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) distinctNode.getArg(); |
| pipelineNode.distinct(); |
| distinctNode.replaceWith(pipelineNode); |
| } |
| } |
| |
| @Override |
| public void meet(Filter filterNode) throws Exception { |
| filterNode.visitChildren(this); |
| if (filterNode.getArg() instanceof AggregationPipelineQueryNode && filterNode.getParentNode() != null) { |
| AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) filterNode.getArg(); |
| if (pipelineNode.filter(filterNode.getCondition())) { |
| filterNode.replaceWith(pipelineNode); |
| } |
| } |
| } |
| } |