| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.rya.indexing.pcj.fluo.app.query; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.AGGREGATION_PREFIX; |
| import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.CONSTRUCT_PREFIX; |
| import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX; |
| import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX; |
| import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX; |
| import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PROJECTION_PREFIX; |
| import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import org.apache.rya.api.client.CreatePCJ.ExportStrategy; |
| import org.apache.rya.api.client.CreatePCJ.QueryType; |
| import org.apache.rya.api.domain.VarNameUtils; |
| import org.apache.rya.api.function.aggregation.AggregationElement; |
| import org.apache.rya.api.function.aggregation.AggregationType; |
| import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph; |
| import org.apache.rya.indexing.pcj.fluo.app.ConstructProjection; |
| import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter; |
| import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; |
| import org.apache.rya.indexing.pcj.fluo.app.NodeType; |
| import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; |
| import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer; |
| import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer.FilterParseException; |
| import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; |
| import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; |
| import org.apache.rya.indexing.pcj.fluo.app.util.VariableOrderUpdateVisitor.UpdateAction; |
| import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; |
| import org.eclipse.rdf4j.model.Value; |
| import org.eclipse.rdf4j.model.ValueFactory; |
| import org.eclipse.rdf4j.model.impl.SimpleValueFactory; |
| import org.eclipse.rdf4j.query.MalformedQueryException; |
| import org.eclipse.rdf4j.query.algebra.AggregateOperator; |
| import org.eclipse.rdf4j.query.algebra.BNodeGenerator; |
| import org.eclipse.rdf4j.query.algebra.Extension; |
| import org.eclipse.rdf4j.query.algebra.ExtensionElem; |
| import org.eclipse.rdf4j.query.algebra.Filter; |
| import org.eclipse.rdf4j.query.algebra.Group; |
| import org.eclipse.rdf4j.query.algebra.GroupElem; |
| import org.eclipse.rdf4j.query.algebra.Join; |
| import org.eclipse.rdf4j.query.algebra.LeftJoin; |
| import org.eclipse.rdf4j.query.algebra.MultiProjection; |
| import org.eclipse.rdf4j.query.algebra.Projection; |
| import org.eclipse.rdf4j.query.algebra.ProjectionElem; |
| import org.eclipse.rdf4j.query.algebra.ProjectionElemList; |
| import org.eclipse.rdf4j.query.algebra.QueryModelNode; |
| import org.eclipse.rdf4j.query.algebra.Reduced; |
| import org.eclipse.rdf4j.query.algebra.StatementPattern; |
| import org.eclipse.rdf4j.query.algebra.TupleExpr; |
| import org.eclipse.rdf4j.query.algebra.UnaryTupleOperator; |
| import org.eclipse.rdf4j.query.algebra.ValueConstant; |
| import org.eclipse.rdf4j.query.algebra.ValueExpr; |
| import org.eclipse.rdf4j.query.algebra.Var; |
| import org.eclipse.rdf4j.query.algebra.helpers.AbstractQueryModelVisitor; |
| import org.eclipse.rdf4j.query.parser.ParsedQuery; |
| import org.eclipse.rdf4j.query.parser.sparql.SPARQLParser; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Sets; |
| |
| import edu.umd.cs.findbugs.annotations.DefaultAnnotation; |
| import edu.umd.cs.findbugs.annotations.NonNull; |
| import net.jcip.annotations.Immutable; |
| |
| /** |
| * Creates the {@link FluoQuery} metadata that is required by the Fluo |
| * application to process a SPARQL query. |
| */ |
| public class SparqlFluoQueryBuilder { |
| |
| private String sparql; |
| private TupleExpr te; |
| private String queryId; |
| private NodeIds nodeIds; |
| private Optional<Integer> joinBatchSize = Optional.empty(); |
| private static final ValueFactory VF = SimpleValueFactory.getInstance(); |
| |
| //Default behavior is to export to Kafka - subject to change when user can |
| //specify their own export strategy |
| private Set<ExportStrategy> exportStrategies = new HashSet<>(Arrays.asList(ExportStrategy.KAFKA)); |
| |
| public SparqlFluoQueryBuilder setSparql(final String sparql) { |
| this.sparql = Preconditions.checkNotNull(sparql); |
| return this; |
| } |
| |
| public SparqlFluoQueryBuilder setTupleExpr(final TupleExpr te) { |
| this.te = Preconditions.checkNotNull(te); |
| return this; |
| } |
| |
| /** |
| * Sets the FluoQuery id as generated by {@link NodeType#generateNewFluoIdForType(NodeType)} or |
| * {@link NodeType#generateNewIdForType(NodeType, String)}, where NodeType is of type Query. |
| * @param queryId for the {@link FluoQuery} |
| * @return SparqlFluoQueryBuilder for chaining method calls |
| */ |
| public SparqlFluoQueryBuilder setFluoQueryId(final String queryId) { |
| this.queryId = Preconditions.checkNotNull(queryId); |
| return this; |
| } |
| |
| public SparqlFluoQueryBuilder setNodeIds(final NodeIds nodeIds) { |
| this.nodeIds = Preconditions.checkNotNull(nodeIds); |
| return this; |
| } |
| |
| public SparqlFluoQueryBuilder setExportStrategies(final Set<ExportStrategy> exportStrategies) { |
| this.exportStrategies = exportStrategies; |
| return this; |
| } |
| |
| public SparqlFluoQueryBuilder setJoinBatchSize(final int joinBatchSize) { |
| Preconditions.checkArgument(joinBatchSize > 0); |
| this.joinBatchSize = Optional.of(joinBatchSize); |
| return this; |
| } |
| |
| public FluoQuery build() throws UnsupportedQueryException { |
| Preconditions.checkNotNull(sparql); |
| Preconditions.checkNotNull(queryId); |
| Preconditions.checkNotNull(exportStrategies); |
| |
| if(nodeIds == null) { |
| nodeIds = new NodeIds(); |
| } |
| |
| if(te == null) { |
| final SPARQLParser parser = new SPARQLParser(); |
| ParsedQuery pq; |
| try { |
| pq = parser.parseQuery(sparql, null); |
| } catch (final MalformedQueryException e) { |
| throw new RuntimeException(e); |
| } |
| te = pq.getTupleExpr(); |
| } |
| |
| PeriodicQueryUtil.placePeriodicQueryNode(te); |
| final String childNodeId = nodeIds.getOrMakeId(te); |
| |
| final FluoQuery.Builder fluoQueryBuilder = FluoQuery.builder(); |
| final QueryMetadata.Builder queryBuilder = QueryMetadata.builder(queryId); |
| //sets {@link QueryType} and VariableOrder |
| setVarOrderAndQueryType(queryBuilder, te); |
| queryBuilder |
| .setSparql(sparql) |
| .setChildNodeId(childNodeId) |
| .setExportStrategies(exportStrategies) |
| .setJoinBatchSize(joinBatchSize); |
| |
| fluoQueryBuilder.setQueryMetadata(queryBuilder); |
| |
| setChildMetadata(fluoQueryBuilder, childNodeId, queryBuilder.getVariableOrder(), queryId); |
| |
| final NewQueryVisitor visitor = new NewQueryVisitor(fluoQueryBuilder, nodeIds); |
| te.visit( visitor ); |
| |
| final FluoQuery fluoQuery = fluoQueryBuilder.build(); |
| return fluoQuery; |
| } |
| |
| /** |
| * A data structure that creates and keeps track of Node IDs for the nodes |
| * of a {@link ParsedQuery}. This structure should only be used while creating |
| * a new PCJ in Fluo and disposed of afterwards. |
| */ |
| @DefaultAnnotation(NonNull.class) |
| public static final class NodeIds { |
| |
| /** |
| * Maps from a parsed SPARQL query's node to the Node ID that has been assigned to it. |
| */ |
| private final Map<QueryModelNode, String> nodeIds = new HashMap<>(); |
| |
| /** |
| * Checks if a SPARQL query's node has had a Node ID assigned to it yet. |
| * |
| * @param node - The node to check. (not null) |
| * @return {@code true} if the {@code node} has had a Node ID assigned to |
| * it; otherwise {@code false}. |
| */ |
| public boolean hasId(final QueryModelNode node) { |
| checkNotNull(node); |
| return nodeIds.containsKey(node); |
| } |
| |
| /** |
| * Get the Node ID that has been assigned a specific node of a SPARQL query. |
| * |
| * @param node - The node whose ID will be fetched. (not null) |
| * @return The Node ID that has been assigned to {@code node} if one |
| * has been assigned to it; otherwise {@code absent}. |
| */ |
| public Optional<String> getId(final QueryModelNode node) { |
| checkNotNull(node); |
| return Optional.ofNullable( nodeIds.get(node) ); |
| } |
| |
| /** |
| * Get the Node ID that has been assigned to a specific node of a SPARQL |
| * query. If one hasn't been assigned yet, then one will be generated |
| * and assigned to it. |
| * |
| * @param node - The node whose ID will be fetched or generated. (not null) |
| * @return The Node ID that is assigned to {@code node}. |
| */ |
| public String getOrMakeId(final QueryModelNode node) { |
| checkNotNull(node); |
| |
| // If a Node ID has already been assigned, return it. |
| if(nodeIds.containsKey(node)){ |
| return nodeIds.get(node); |
| } |
| |
| // Otherwise create a new ID and store it for later. |
| final String id = makeId(node); |
| nodeIds.put(node, id); |
| return id; |
| } |
| |
| private String makeId(final QueryModelNode node) { |
| checkNotNull(node); |
| |
| // Create the prefix of the id. This makes it a little bit more human readable. |
| String prefix; |
| if (node instanceof StatementPattern) { |
| prefix = SP_PREFIX; |
| } else if (node instanceof Filter) { |
| prefix = FILTER_PREFIX; |
| } else if (node instanceof Join || node instanceof LeftJoin) { |
| prefix = JOIN_PREFIX; |
| } else if (node instanceof Projection) { |
| prefix = PROJECTION_PREFIX; |
| } else if(node instanceof Extension) { |
| prefix = AGGREGATION_PREFIX; |
| } else if (node instanceof Reduced) { |
| prefix = CONSTRUCT_PREFIX; |
| } else if(node instanceof PeriodicQueryNode) { |
| prefix = PERIODIC_QUERY_PREFIX; |
| } else { |
| throw new IllegalArgumentException("Node must be of type {StatementPattern, Join, Filter, Extension, Projection} but was " + node.getClass()); |
| } |
| |
| final String unique = UUID.randomUUID().toString().replaceAll("-", ""); |
| // Put them together to create the Node ID. |
| return prefix + "_" + unique; |
| } |
| |
| } |
| |
| /** |
| * Visits each node of a {@link ParsedQuery} and adds information about |
| * the node to a {@link FluoQuery.Builder}. This information is used by the |
| * application's observers to incrementally update a PCJ. |
| */ |
| public static class NewQueryVisitor extends AbstractQueryModelVisitor<RuntimeException> { |
| |
| private final NodeIds nodeIds; |
| private final FluoQuery.Builder fluoQueryBuilder; |
| |
| /** |
| * Constructs an instance of {@link NewQueryVisitor}. |
| * |
| * @param fluoQueryBuilder - The builder that will be updated by this |
| * vistior to include metadata about each of the query nodes. (not null) |
| * @param nodeIds - The NodeIds object is passed in so that other parts |
| * of the application may look up which ID is associated with each |
| * node of the query. |
| */ |
| public NewQueryVisitor(final FluoQuery.Builder fluoQueryBuilder, final NodeIds nodeIds) { |
| this.fluoQueryBuilder = checkNotNull(fluoQueryBuilder); |
| this.nodeIds = checkNotNull(nodeIds); |
| } |
| |
| /** |
| * If we encounter an Extension node that contains a Group, then we've found an aggregation. |
| */ |
| @Override |
| public void meet(final Extension node) { |
| final TupleExpr arg = node.getArg(); |
| if(arg instanceof Group) { |
| final Group group = (Group) arg; |
| |
| // Get the Aggregation Node's id. |
| final String aggregationId = nodeIds.getOrMakeId(node); |
| |
| // Get the group's child node id. This call forces it to be a supported child type. |
| final TupleExpr child = group.getArg(); |
| final String childNodeId = nodeIds.getOrMakeId( child ); |
| |
| // Get the list of group by binding names. |
| VariableOrder groupByVariableOrder = null; |
| if(!group.getGroupBindingNames().isEmpty()) { |
| groupByVariableOrder = new VariableOrder(group.getGroupBindingNames()); |
| } else { |
| groupByVariableOrder = new VariableOrder(); |
| } |
| |
| |
| // The aggregations that need to be performed are the Group Elements. |
| final List<AggregationElement> aggregations = new ArrayList<>(); |
| for(final GroupElem groupElem : group.getGroupElements()) { |
| // Figure out the type of the aggregation. |
| final AggregateOperator operator = groupElem.getOperator(); |
| final Optional<AggregationType> type = AggregationType.byOperatorClass( operator.getClass() ); |
| |
| // If the type is one we support, create the AggregationElement. |
| if(type.isPresent()) { |
| final String resultBindingName = groupElem.getName(); |
| |
| final AtomicReference<String> aggregatedBindingName = new AtomicReference<>(); |
| groupElem.visitChildren(new AbstractQueryModelVisitor<RuntimeException>() { |
| @Override |
| public void meet(final Var node) { |
| aggregatedBindingName.set( node.getName() ); |
| } |
| }); |
| |
| aggregations.add( new AggregationElement(type.get(), aggregatedBindingName.get(), resultBindingName) ); |
| } |
| } |
| |
| // Update the aggregation's metadata. |
| AggregationMetadata.Builder aggregationBuilder = fluoQueryBuilder.getAggregateBuilder(aggregationId).orNull(); |
| if(aggregationBuilder == null) { |
| aggregationBuilder = AggregationMetadata.builder(aggregationId); |
| fluoQueryBuilder.addAggregateMetadata(aggregationBuilder); |
| } |
| |
| aggregationBuilder.setChildNodeId(childNodeId); |
| aggregationBuilder.setGroupByVariableOrder(groupByVariableOrder); |
| |
| final Set<String> aggregationVars = getVarsToDelete(groupByVariableOrder.getVariableOrders(), aggregationBuilder.getVariableOrder().getVariableOrders()); |
| FluoQueryUtils.updateVarOrders(fluoQueryBuilder, UpdateAction.DeleteVariable, Lists.newArrayList(aggregationVars), aggregationId); |
| |
| for(final AggregationElement aggregation : aggregations) { |
| aggregationBuilder.addAggregation(aggregation); |
| } |
| |
| |
| |
| // Update the child node's metadata. |
| final Set<String> childVars = getVars(child); |
| final VariableOrder childVarOrder = new VariableOrder(childVars); |
| |
| setChildMetadata(fluoQueryBuilder, childNodeId, childVarOrder, aggregationId); |
| } |
| |
| // Walk to the next node. |
| super.meet(node); |
| } |
| |
| @Override |
| public void meet(final StatementPattern node) { |
| // Extract metadata that will be stored from the node. |
| final String spNodeId = nodeIds.getOrMakeId(node); |
| final String pattern = FluoStringConverter.toStatementPatternString(node); |
| |
| // Get or create a builder for this node populated with the known metadata. |
| StatementPatternMetadata.Builder spBuilder = fluoQueryBuilder.getStatementPatternBuilder(spNodeId).orNull(); |
| if(spBuilder == null) { |
| spBuilder = StatementPatternMetadata.builder(spNodeId); |
| fluoQueryBuilder.addStatementPatternBuilder(spBuilder); |
| } |
| spBuilder.setStatementPattern(pattern); |
| } |
| |
| @Override |
| public void meet(final LeftJoin node) { |
| // Extract the metadata that will be stored for the node. |
| final String leftJoinNodeId = nodeIds.getOrMakeId(node); |
| final QueryModelNode left = node.getLeftArg(); |
| final QueryModelNode right = node.getRightArg(); |
| |
| // Update the metadata for the JoinMetadata.Builder. |
| makeJoinMetadata(leftJoinNodeId, JoinType.LEFT_OUTER_JOIN, left, right); |
| |
| // Walk to the next node. |
| super.meet(node); |
| } |
| |
| @Override |
| public void meet(final Join node) { |
| // Extract the metadata that will be stored from the node. |
| final String joinNodeId = nodeIds.getOrMakeId(node); |
| final QueryModelNode left = node.getLeftArg(); |
| final QueryModelNode right = node.getRightArg(); |
| |
| // Update the metadata for the JoinMetadata.Builder. |
| makeJoinMetadata(joinNodeId, JoinType.NATURAL_JOIN, left, right); |
| |
| // Walk to the next node. |
| super.meet(node); |
| } |
| |
| private void makeJoinMetadata(final String joinNodeId, final JoinType joinType, final QueryModelNode left, final QueryModelNode right) { |
| final String leftChildNodeId = nodeIds.getOrMakeId(left); |
| final String rightChildNodeId = nodeIds.getOrMakeId(right); |
| |
| // Get or create a builder for this node populated with the known metadata. |
| JoinMetadata.Builder joinBuilder = fluoQueryBuilder.getJoinBuilder(joinNodeId).orNull(); |
| if(joinBuilder == null) { |
| joinBuilder = JoinMetadata.builder(joinNodeId); |
| fluoQueryBuilder.addJoinMetadata(joinBuilder); |
| } |
| joinBuilder.setJoinType(joinType); |
| joinBuilder.setLeftChildNodeId( leftChildNodeId ); |
| joinBuilder.setRightChildNodeId( rightChildNodeId ); |
| if(fluoQueryBuilder.getQueryBuilder().getJoinBatchSize().isPresent()) { |
| joinBuilder.setJoinBatchSize(fluoQueryBuilder.getQueryBuilder().getJoinBatchSize().get()); |
| } |
| |
| // Figure out the variable order for each child node's binding set and |
| // store it. Also store that each child node's parent is this join. |
| final Set<String> leftVars = getVars((TupleExpr)left); |
| final Set<String> rightVars = getVars((TupleExpr) right); |
| final JoinVarOrders varOrders = getJoinArgVarOrders(leftVars, rightVars); |
| |
| // Create or update the left child's variable order and parent node id. |
| final VariableOrder leftVarOrder = varOrders.getLeftVarOrder(); |
| setChildMetadata(fluoQueryBuilder, leftChildNodeId, leftVarOrder, joinNodeId); |
| |
| // Create or update the right child's variable order and parent node id. |
| final VariableOrder rightVarOrder = varOrders.getRightVarOrder(); |
| setChildMetadata(fluoQueryBuilder, rightChildNodeId, rightVarOrder, joinNodeId); |
| } |
| |
| @Override |
| public void meet(final Filter node) { |
| |
| // Get or create a builder for this node populated with the known metadata. |
| final String filterId = nodeIds.getOrMakeId(node); |
| |
| FilterMetadata.Builder filterBuilder = fluoQueryBuilder.getFilterBuilder(filterId).orNull(); |
| if(filterBuilder == null) { |
| filterBuilder = FilterMetadata.builder(filterId); |
| fluoQueryBuilder.addFilterMetadata(filterBuilder); |
| } |
| |
| String filterString; |
| try { |
| filterString = FilterSerializer.serialize(node); |
| } catch (final FilterParseException e) { |
| throw new RuntimeException(e); |
| } |
| filterBuilder.setFilterSparql(filterString); |
| |
| final QueryModelNode child = node.getArg(); |
| if(child == null) { |
| throw new IllegalArgumentException("Filter arg connot be null."); |
| } |
| |
| final String childNodeId = nodeIds.getOrMakeId(child); |
| filterBuilder.setChildNodeId(childNodeId); |
| |
| // Update the child node's metadata. |
| final Set<String> childVars = getVars((TupleExpr)child); |
| final VariableOrder childVarOrder = new VariableOrder(childVars); |
| setChildMetadata(fluoQueryBuilder, childNodeId, childVarOrder, filterId); |
| |
| // Walk to the next node. |
| super.meet(node); |
| } |
| |
| @Override |
| public void meetOther(final QueryModelNode qNode) { |
| if (qNode instanceof PeriodicQueryNode) { |
| final PeriodicQueryNode node = (PeriodicQueryNode) qNode; |
| // Get or create a builder for this node populated with the |
| // known metadata. |
| final String periodicId = nodeIds.getOrMakeId(node); |
| |
| PeriodicQueryMetadata.Builder periodicBuilder = fluoQueryBuilder.getPeriodicQueryBuilder().orNull(); |
| if (periodicBuilder == null) { |
| periodicBuilder = PeriodicQueryMetadata.builder(); |
| periodicBuilder.setNodeId(periodicId); |
| fluoQueryBuilder.addPeriodicQueryMetadata(periodicBuilder); |
| } |
| periodicBuilder.setWindowSize(node.getWindowSize()); |
| periodicBuilder.setPeriod(node.getPeriod()); |
| periodicBuilder.setTemporalVariable(node.getTemporalVariable()); |
| periodicBuilder.setUnit(node.getUnit()); |
| |
| final QueryModelNode child = node.getArg(); |
| if (child == null) { |
| throw new IllegalArgumentException("PeriodicQueryNode child arg connot be null."); |
| } |
| |
| final String childNodeId = nodeIds.getOrMakeId(child); |
| periodicBuilder.setChildNodeId(childNodeId); |
| |
| // Update the child node's metadata. |
| final Set<String> childVars = getVars((TupleExpr) child); |
| final VariableOrder childVarOrder = new VariableOrder(childVars); |
| setChildMetadata(fluoQueryBuilder, childNodeId, childVarOrder, periodicId); |
| |
| // update variable order of this node and all ancestors to |
| // include BIN_ID binding as |
| // first variable in the ordering |
| FluoQueryUtils.updateVarOrders(fluoQueryBuilder, UpdateAction.AddVariable, |
| Collections.singletonList(IncrementalUpdateConstants.PERIODIC_BIN_ID), periodicId); |
| // Walk to the next node. |
| node.getArg().visit(this); |
| } |
| } |
| |
| |
| @Override |
| public void meet(final Projection node) { |
| // Create a builder for this node populated with the metadata. |
| final String queryId = nodeIds.getOrMakeId(node); |
| |
| ProjectionMetadata.Builder projectionBuilder = fluoQueryBuilder.getProjectionBuilder(queryId).orNull(); |
| if (projectionBuilder == null) { |
| projectionBuilder = ProjectionMetadata.builder(queryId); |
| fluoQueryBuilder.addProjectionBuilder(projectionBuilder); |
| } |
| |
| final QueryModelNode child = node.getArg(); |
| if(child == null) { |
| throw new IllegalArgumentException("Projection arg connot be null."); |
| } |
| |
| final String childNodeId = nodeIds.getOrMakeId(child); |
| projectionBuilder.setChildNodeId(childNodeId); |
| projectionBuilder.setProjectedVars(projectionBuilder.getVariableOrder()); |
| |
| // Update the child node's metadata. |
| final Set<String> childVars = getVars((TupleExpr)child); |
| final VariableOrder childVarOrder = new VariableOrder(childVars); |
| |
| setChildMetadata(fluoQueryBuilder, childNodeId, childVarOrder, queryId); |
| |
| // Walk to the next node. |
| super.meet(node); |
| } |
| |
| |
| @Override |
| public void meet(final Reduced node) { |
| //create id, initialize ConstructQueryMetadata builder, register ConstructQueryMetadata |
| //builder with FluoQueryBuilder, and add metadata that we currently have |
| final String constructId = nodeIds.getOrMakeId(node); |
| |
| ConstructQueryMetadata.Builder constructBuilder = fluoQueryBuilder.getConstructQueryBuilder().orNull(); |
| if(constructBuilder == null) { |
| constructBuilder = ConstructQueryMetadata.builder(); |
| constructBuilder.setNodeId(constructId); |
| fluoQueryBuilder.setConstructQueryMetadata(constructBuilder); |
| } |
| |
| //get child node |
| QueryModelNode child = node.getArg(); |
| Preconditions.checkArgument(child instanceof Projection || child instanceof MultiProjection); |
| final UnaryTupleOperator unary = (UnaryTupleOperator) child; |
| |
| //get ProjectionElemList to build ConstructGraph |
| final List<ProjectionElemList> projections = new ArrayList<>(); |
| if(unary instanceof Projection) { |
| projections.add(((Projection) unary).getProjectionElemList()); |
| } else { |
| projections.addAll(((MultiProjection)unary).getProjections()); |
| } |
| |
| //get ExtensionElems to build ConstructGraph |
| final QueryModelNode grandChild = unary.getArg(); |
| Preconditions.checkArgument(grandChild instanceof Extension); |
| final Extension extension = (Extension) grandChild; |
| final List<ExtensionElem> extensionElems = extension.getElements(); |
| final ConstructGraph graph = getConstructGraph(projections, extensionElems); |
| constructBuilder.setConstructGraph(graph); |
| |
| //set child to the next node we care about in Fluo |
| //if Extension's arg is a Group node, then it is an Aggregation, so set child to Extension |
| //otherwise set child to Extension's child (only care about Extensions if they are Aggregations) |
| if(extension.getArg() instanceof Group) { |
| child = extension; |
| } else { |
| child = extension.getArg(); |
| } |
| |
| //Set the child node in the ConstructQueryMetadataBuilder |
| final String childNodeId = nodeIds.getOrMakeId(child); |
| constructBuilder.setChildNodeId(childNodeId); |
| |
| // Update the child node's metadata. |
| final Set<String> childVars = getVars((TupleExpr)child); |
| final VariableOrder childVarOrder = new VariableOrder(childVars); |
| setChildMetadata(fluoQueryBuilder, childNodeId, childVarOrder, constructId); |
| |
| //fast forward visitor to next node we care about |
| child.visit(this); |
| } |
| |
| private ConstructGraph getConstructGraph(final List<ProjectionElemList> projections, final List<ExtensionElem> extensionElems) { |
| final Map<String, Value> valueMap = new HashMap<>(); |
| //create valueMap to associate source names with Values |
| for(final ExtensionElem elem: extensionElems) { |
| final String name = elem.getName(); |
| final ValueExpr expr = elem.getExpr(); |
| if(expr instanceof ValueConstant) { |
| final Value value = ((ValueConstant) expr).getValue(); |
| valueMap.put(name, value); |
| } else if(expr instanceof BNodeGenerator) { |
| valueMap.put(name, VF.createBNode(UUID.randomUUID().toString())); |
| } |
| } |
| |
| final Set<ConstructProjection> constructProj = new HashSet<>(); |
| //build ConstructProjection for each ProjectionElemList |
| for(final ProjectionElemList list: projections) { |
| validateProjectionElemList(list); |
| final List<Var> vars = new ArrayList<>(); |
| for(final ProjectionElem elem: list.getElements()) { |
| final String sourceName = elem.getSourceName(); |
| final Var var = new Var(sourceName); |
| if(valueMap.containsKey(sourceName)) { |
| var.setValue(valueMap.get(sourceName)); |
| } |
| vars.add(var); |
| } |
| constructProj.add(new ConstructProjection(vars.get(0), vars.get(1), vars.get(2))); |
| } |
| |
| return new ConstructGraph(constructProj); |
| } |
| |
| private Set<String> getVarsToDelete(final Collection<String> groupByVars, final Collection<String> varOrderVars) { |
| final Set<String> groupBySet = Sets.newHashSet(groupByVars); |
| final Set<String> varOrderSet = Sets.newHashSet(varOrderVars); |
| |
| return Sets.difference(varOrderSet, groupBySet); |
| } |
| |
| private void validateProjectionElemList(final ProjectionElemList list) { |
| final List<ProjectionElem> elements = list.getElements(); |
| checkArgument(elements.size() == 3); |
| checkArgument(elements.get(0).getTargetName().equals("subject")); |
| checkArgument(elements.get(1).getTargetName().equals("predicate")); |
| checkArgument(elements.get(2).getTargetName().equals("object")); |
| } |
| |
| /** |
| * Get the non-constant variables from a {@link TupleExpr}. |
| * |
| * @param node - The node to inspect for variables. (not null) |
| * @return The non-constant variables that were part of the node. |
| */ |
| private Set<String> getVars(final TupleExpr node) { |
| checkNotNull(node); |
| |
| final Set<String> vars = Sets.newHashSet(); |
| |
| for(final String bindingName : node.getBindingNames()) { |
| if (!VarNameUtils.isConstant(bindingName)) { |
| vars.add(bindingName); |
| } |
| } |
| |
| return vars; |
| } |
| |
| /** |
| * Holds the Variable Order of the binding sets for the children of a join node. |
| */ |
| @Immutable |
| @DefaultAnnotation(NonNull.class) |
| private static final class JoinVarOrders { |
| private final VariableOrder leftVarOrder; |
| private final VariableOrder rightVarOrder; |
| |
| /** |
| * Constructs an instance of {@link }. |
| * |
| * @param leftVarOrder - The left child's Variable Order. (not null) |
| * @param rightVarOrder - The right child's Variable Order. (not null) |
| */ |
| public JoinVarOrders(final VariableOrder leftVarOrder, final VariableOrder rightVarOrder) { |
| this.leftVarOrder = checkNotNull(leftVarOrder); |
| this.rightVarOrder = checkNotNull(rightVarOrder); |
| } |
| |
| /** |
| * @return The left child's Variable Order. |
| */ |
| public VariableOrder getLeftVarOrder() { |
| return leftVarOrder; |
| } |
| |
| /** |
| * @return The right child's Variable Order. |
| */ |
| public VariableOrder getRightVarOrder() { |
| return rightVarOrder; |
| } |
| } |
| |
| /** |
| * Shifts the common variables between the two children to the left so |
| * that Accumulo scans when performing the join are efficient. |
| * |
| * @param leftVars - The left child's variables. (not null) |
| * @param rightVars - The right child's variables. (not null) |
| * @return An object holding the left and right children's variable orders. |
| */ |
| private JoinVarOrders getJoinArgVarOrders(final Set<String> leftVars, final Set<String> rightVars) { |
| checkNotNull(leftVars); |
| checkNotNull(rightVars); |
| |
| // Find the common variables between the left and right children. The common vars |
| // are stored in a list to ensure iteration order is always the same. |
| final List<String> commonVars = new ArrayList<>( Sets.intersection(leftVars, rightVars) ); |
| |
| // Push all of the common variables to the left for each child's vars. |
| final List<String> leftVarOrder = leftShiftCommonVars(commonVars, leftVars); |
| final List<String> rightVarOrder = leftShiftCommonVars(commonVars, rightVars); |
| return new JoinVarOrders(new VariableOrder(leftVarOrder), new VariableOrder(rightVarOrder)); |
| } |
| |
| /** |
| * Orders the set of common variables so that all of the common ones |
| * are on the left in the same order they have been provided. The rest |
| * of the variables are iterated over and added to the end of the list |
| * in no particular order. |
| * |
| * @param commonVars - An ordered list of variables that must appear on the left. (not null) |
| * @param allVars - The variables that need to be ordered. (not null) |
| * @return A list of variables ordered as described above. |
| */ |
| private List<String> leftShiftCommonVars(final List<String> commonVars, final Collection<String> allVars) { |
| checkNotNull(commonVars); |
| checkNotNull(allVars); |
| |
| final List<String> shifted = Lists.newArrayList(commonVars); |
| for(final String var : allVars) { |
| if(!shifted.contains(var)) { |
| shifted.add(var); |
| } |
| } |
| return shifted; |
| } |
| } |
| |
| private void setVarOrderAndQueryType(final QueryMetadata.Builder builder, final TupleExpr te) { |
| final QueryMetadataLocator locator = new QueryMetadataLocator(); |
| try { |
| te.visit(locator); |
| } catch (final Exception e) { |
| throw new RuntimeException(e); |
| } |
| |
| builder.setVarOrder(locator.getVarOrder()); |
| builder.setQueryType(locator.getQueryType()); |
| } |
| |
| public static class QueryMetadataLocator extends AbstractQueryModelVisitor<Exception> { |
| |
| private VariableOrder varOrder; |
| private QueryType queryType; |
| |
| public VariableOrder getVarOrder() { |
| return varOrder; |
| } |
| |
| public QueryType getQueryType() { |
| return queryType; |
| } |
| |
| @Override |
| public void meet(final Projection node) throws Exception { |
| final Set<String> bindingNames = node.getBindingNames(); |
| if(varOrder == null) { |
| varOrder = new VariableOrder(bindingNames); |
| } |
| |
| if(queryType == null) { |
| queryType = QueryType.PROJECTION; |
| } |
| super.meet(node); |
| } |
| |
| @Override |
| public void meet(final Reduced node) throws Exception { |
| if(varOrder == null) { |
| varOrder = getConstructGraphVarOrder(node); |
| } |
| |
| if(queryType == null) { |
| queryType = QueryType.CONSTRUCT; |
| } |
| super.meet(node); |
| } |
| |
| @Override |
| public void meetOther(final QueryModelNode node) throws Exception { |
| if (node instanceof PeriodicQueryNode) { |
| queryType = QueryType.PERIODIC; |
| } else { |
| super.meetOther(node); |
| } |
| } |
| } |
| |
| private static VariableOrder getConstructGraphVarOrder(final Reduced node) { |
| |
| //get child node |
| final QueryModelNode child = node.getArg(); |
| Preconditions.checkArgument(child instanceof Projection || child instanceof MultiProjection); |
| final UnaryTupleOperator unary = (UnaryTupleOperator) child; |
| |
| //get ProjectionElemList to build ConstructGraph |
| final List<ProjectionElemList> projections = new ArrayList<>(); |
| if(unary instanceof Projection) { |
| projections.add(((Projection) unary).getProjectionElemList()); |
| } else { |
| projections.addAll(((MultiProjection)unary).getProjections()); |
| } |
| |
| return getConstructGraphVarOrder(projections); |
| } |
| |
| private static VariableOrder getConstructGraphVarOrder(List<ProjectionElemList> projections) { |
| final Set<String> varOrders = new HashSet<>(); |
| |
| for(final ProjectionElemList elems: projections) { |
| for(final ProjectionElem elem: elems.getElements()) { |
| final String name = elem.getSourceName(); |
| if (!VarNameUtils.isConstant(name) && !VarNameUtils.isAnonymous(name)) { |
| varOrders.add(name); |
| } |
| } |
| } |
| |
| return new VariableOrder(varOrders); |
| } |
| |
| |
| /** |
| * Update a query node's metadata to include it's binding set variable order |
| * and it's parent node id. This information is only known when handling |
| * the parent node. |
| * |
| * @param fluoQueryBuilder - Builder whose metadata is updatad |
| * @param childNodeId - The node ID of the child node. |
| * @param childVarOrder - The variable order of the child node's binding sets. |
| * @param parentNodeId - The node ID that consumes the child's binding sets. |
| */ |
| private static void setChildMetadata(final FluoQuery.Builder fluoQueryBuilder, final String childNodeId, final VariableOrder childVarOrder, final String parentNodeId) { |
| checkNotNull(childNodeId); |
| checkNotNull(childVarOrder); |
| checkNotNull(parentNodeId); |
| |
| final NodeType childType = NodeType.fromNodeId(childNodeId).get(); |
| switch (childType) { |
| case STATEMENT_PATTERN: |
| StatementPatternMetadata.Builder spBuilder = fluoQueryBuilder.getStatementPatternBuilder(childNodeId).orNull(); |
| if (spBuilder == null) { |
| spBuilder = StatementPatternMetadata.builder(childNodeId); |
| fluoQueryBuilder.addStatementPatternBuilder(spBuilder); |
| } |
| |
| spBuilder.setVarOrder(childVarOrder); |
| spBuilder.setParentNodeId(parentNodeId); |
| break; |
| |
| case JOIN: |
| JoinMetadata.Builder joinBuilder = fluoQueryBuilder.getJoinBuilder(childNodeId).orNull(); |
| if (joinBuilder == null) { |
| joinBuilder = JoinMetadata.builder(childNodeId); |
| fluoQueryBuilder.addJoinMetadata(joinBuilder); |
| } |
| |
| joinBuilder.setVarOrder(childVarOrder); |
| joinBuilder.setParentNodeId(parentNodeId); |
| break; |
| |
| case FILTER: |
| FilterMetadata.Builder filterBuilder = fluoQueryBuilder.getFilterBuilder(childNodeId).orNull(); |
| if (filterBuilder == null) { |
| filterBuilder = FilterMetadata.builder(childNodeId); |
| fluoQueryBuilder.addFilterMetadata(filterBuilder); |
| } |
| |
| filterBuilder.setVarOrder(childVarOrder); |
| filterBuilder.setParentNodeId(parentNodeId); |
| break; |
| |
| case AGGREGATION: |
| AggregationMetadata.Builder aggregationBuilder = fluoQueryBuilder.getAggregateBuilder(childNodeId).orNull(); |
| if (aggregationBuilder == null) { |
| aggregationBuilder = AggregationMetadata.builder(childNodeId); |
| fluoQueryBuilder.addAggregateMetadata(aggregationBuilder); |
| } |
| |
| aggregationBuilder.setVarOrder(childVarOrder); |
| aggregationBuilder.setParentNodeId(parentNodeId); |
| break; |
| |
| case PROJECTION: |
| ProjectionMetadata.Builder projectionBuilder = fluoQueryBuilder.getProjectionBuilder(childNodeId).orNull(); |
| if(projectionBuilder == null) { |
| projectionBuilder = ProjectionMetadata.builder(childNodeId); |
| fluoQueryBuilder.addProjectionBuilder(projectionBuilder); |
| } |
| |
| projectionBuilder.setVarOrder(childVarOrder); |
| projectionBuilder.setParentNodeId(parentNodeId); |
| break; |
| |
| case QUERY: |
| throw new IllegalArgumentException("A QUERY node cannot be the child of another node."); |
| |
| case CONSTRUCT: |
| ConstructQueryMetadata.Builder constructBuilder = fluoQueryBuilder.getConstructQueryBuilder().orNull(); |
| if(constructBuilder == null) { |
| constructBuilder = ConstructQueryMetadata.builder(); |
| constructBuilder.setNodeId(childNodeId); |
| fluoQueryBuilder.setConstructQueryMetadata(constructBuilder); |
| } |
| |
| Preconditions.checkArgument(childNodeId.equals(constructBuilder.getNodeId())); |
| constructBuilder.setVarOrder(childVarOrder); |
| constructBuilder.setParentNodeId(parentNodeId); |
| break; |
| |
| case PERIODIC_QUERY: |
| PeriodicQueryMetadata.Builder periodicQueryBuilder = fluoQueryBuilder.getPeriodicQueryBuilder().orNull(); |
| if (periodicQueryBuilder == null) { |
| periodicQueryBuilder = PeriodicQueryMetadata.builder(); |
| periodicQueryBuilder.setNodeId(childNodeId); |
| fluoQueryBuilder.addPeriodicQueryMetadata(periodicQueryBuilder); |
| } |
| periodicQueryBuilder.setVarOrder(childVarOrder); |
| periodicQueryBuilder.setParentNodeId(parentNodeId); |
| break; |
| |
| default: |
| throw new IllegalArgumentException("Unsupported NodeType: " + childType); |
| } |
| } |
| } |