/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.rya.indexing.pcj.fluo.app.query;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.AGGREGATION_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.CONSTRUCT_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.FILTER_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.JOIN_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.PROJECTION_PREFIX;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.SP_PREFIX;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
import org.apache.rya.api.client.CreatePCJ.QueryType;
import org.apache.rya.api.domain.VarNameUtils;
import org.apache.rya.api.function.aggregation.AggregationElement;
import org.apache.rya.api.function.aggregation.AggregationType;
import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
import org.apache.rya.indexing.pcj.fluo.app.ConstructProjection;
import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer;
import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer.FilterParseException;
import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
import org.apache.rya.indexing.pcj.fluo.app.util.VariableOrderUpdateVisitor.UpdateAction;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.query.MalformedQueryException;
import org.eclipse.rdf4j.query.algebra.AggregateOperator;
import org.eclipse.rdf4j.query.algebra.BNodeGenerator;
import org.eclipse.rdf4j.query.algebra.Extension;
import org.eclipse.rdf4j.query.algebra.ExtensionElem;
import org.eclipse.rdf4j.query.algebra.Filter;
import org.eclipse.rdf4j.query.algebra.Group;
import org.eclipse.rdf4j.query.algebra.GroupElem;
import org.eclipse.rdf4j.query.algebra.Join;
import org.eclipse.rdf4j.query.algebra.LeftJoin;
import org.eclipse.rdf4j.query.algebra.MultiProjection;
import org.eclipse.rdf4j.query.algebra.Projection;
import org.eclipse.rdf4j.query.algebra.ProjectionElem;
import org.eclipse.rdf4j.query.algebra.ProjectionElemList;
import org.eclipse.rdf4j.query.algebra.QueryModelNode;
import org.eclipse.rdf4j.query.algebra.Reduced;
import org.eclipse.rdf4j.query.algebra.StatementPattern;
import org.eclipse.rdf4j.query.algebra.TupleExpr;
import org.eclipse.rdf4j.query.algebra.UnaryTupleOperator;
import org.eclipse.rdf4j.query.algebra.ValueConstant;
import org.eclipse.rdf4j.query.algebra.ValueExpr;
import org.eclipse.rdf4j.query.algebra.Var;
import org.eclipse.rdf4j.query.algebra.helpers.AbstractQueryModelVisitor;
import org.eclipse.rdf4j.query.parser.ParsedQuery;
import org.eclipse.rdf4j.query.parser.sparql.SPARQLParser;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import net.jcip.annotations.Immutable;

/**
 * Creates the {@link FluoQuery} metadata that is required by the Fluo
 * application to process a SPARQL query.
 */
public class SparqlFluoQueryBuilder {

    private String sparql;
    private TupleExpr te;
    private String queryId;
    private NodeIds nodeIds;
    private Optional<Integer> joinBatchSize = Optional.empty();
    private static final ValueFactory VF = SimpleValueFactory.getInstance();

    //Default behavior is to export to Kafka - subject to change when user can
    //specify their own export strategy
    private Set<ExportStrategy> exportStrategies = new HashSet<>(Arrays.asList(ExportStrategy.KAFKA));

    public SparqlFluoQueryBuilder setSparql(final String sparql) {
        this.sparql = Preconditions.checkNotNull(sparql);
        return this;
    }

    public SparqlFluoQueryBuilder setTupleExpr(final TupleExpr te) {
        this.te = Preconditions.checkNotNull(te);
        return this;
    }

    /**
     * Sets the FluoQuery id as generated by {@link NodeType#generateNewFluoIdForType(NodeType)} or
     * {@link NodeType#generateNewIdForType(NodeType, String)}, where NodeType is of type Query.
     * @param queryId for the {@link FluoQuery}
     * @return SparqlFluoQueryBuilder for chaining method calls
     */
    public SparqlFluoQueryBuilder setFluoQueryId(final String queryId) {
        this.queryId = Preconditions.checkNotNull(queryId);
        return this;
    }

    public SparqlFluoQueryBuilder setNodeIds(final NodeIds nodeIds) {
        this.nodeIds = Preconditions.checkNotNull(nodeIds);
        return this;
    }

    public SparqlFluoQueryBuilder setExportStrategies(final Set<ExportStrategy> exportStrategies) {
        this.exportStrategies = exportStrategies;
        return this;
    }

    public SparqlFluoQueryBuilder setJoinBatchSize(final int joinBatchSize) {
        Preconditions.checkArgument(joinBatchSize > 0);
        this.joinBatchSize = Optional.of(joinBatchSize);
        return this;
    }

    public FluoQuery build() throws UnsupportedQueryException {
        Preconditions.checkNotNull(sparql);
        Preconditions.checkNotNull(queryId);
        Preconditions.checkNotNull(exportStrategies);

        if(nodeIds == null) {
            nodeIds = new NodeIds();
        }

        if(te == null) {
            final SPARQLParser parser = new SPARQLParser();
            ParsedQuery pq;
            try {
                pq = parser.parseQuery(sparql, null);
            } catch (final MalformedQueryException e) {
               throw new RuntimeException(e);
            }
            te = pq.getTupleExpr();
        }

        PeriodicQueryUtil.placePeriodicQueryNode(te);
        final String childNodeId = nodeIds.getOrMakeId(te);

        final FluoQuery.Builder fluoQueryBuilder = FluoQuery.builder();
        final QueryMetadata.Builder queryBuilder = QueryMetadata.builder(queryId);
        //sets {@link QueryType} and VariableOrder
        setVarOrderAndQueryType(queryBuilder, te);
        queryBuilder
            .setSparql(sparql)
            .setChildNodeId(childNodeId)
            .setExportStrategies(exportStrategies)
            .setJoinBatchSize(joinBatchSize);

        fluoQueryBuilder.setQueryMetadata(queryBuilder);

        setChildMetadata(fluoQueryBuilder, childNodeId, queryBuilder.getVariableOrder(), queryId);

        final NewQueryVisitor visitor = new NewQueryVisitor(fluoQueryBuilder, nodeIds);
        te.visit( visitor );

        final FluoQuery fluoQuery = fluoQueryBuilder.build();
        return fluoQuery;
    }

    /**
     * A data structure that creates and keeps track of Node IDs for the nodes
     * of a {@link ParsedQuery}. This structure should only be used while creating
     * a new PCJ in Fluo and disposed of afterwards.
     */
    @DefaultAnnotation(NonNull.class)
    public static final class NodeIds {

        /**
         * Maps from a parsed SPARQL query's node to the Node ID that has been assigned to it.
         */
        private final Map<QueryModelNode, String> nodeIds = new HashMap<>();

        /**
         * Checks if a SPARQL query's node has had a Node ID assigned to it yet.
         *
         * @param node - The node to check. (not null)
         * @return {@code true} if the {@code node} has had a Node ID assigned to
         *   it; otherwise {@code false}.
         */
        public boolean hasId(final QueryModelNode node) {
            checkNotNull(node);
            return nodeIds.containsKey(node);
        }

        /**
         * Get the Node ID that has been assigned a specific node of a SPARQL query.
         *
         * @param node - The node whose ID will be fetched. (not null)
         * @return The Node ID that has been assigned to {@code node} if one
         *   has been assigned to it; otherwise {@code absent}.
         */
        public Optional<String> getId(final QueryModelNode node) {
            checkNotNull(node);
            return Optional.ofNullable( nodeIds.get(node) );
        }

        /**
         * Get the Node ID that has been assigned to a specific node of a SPARQL
         * query. If one hasn't been assigned yet, then one will be generated
         * and assigned to it.
         *
         * @param node - The node whose ID will be fetched or generated. (not null)
         * @return The Node ID that is assigned to {@code node}.
         */
        public String getOrMakeId(final QueryModelNode node) {
            checkNotNull(node);

            // If a Node ID has already been assigned, return it.
            if(nodeIds.containsKey(node)){
                return nodeIds.get(node);
            }

            // Otherwise create a new ID and store it for later.
            final String id = makeId(node);
            nodeIds.put(node, id);
            return id;
        }

        private String makeId(final QueryModelNode node) {
            checkNotNull(node);

            // Create the prefix of the id. This makes it a little bit more human readable.
            String prefix;
            if (node instanceof StatementPattern) {
                prefix = SP_PREFIX;
            } else if (node instanceof Filter) {
                prefix = FILTER_PREFIX;
            } else if (node instanceof Join || node instanceof LeftJoin) {
                prefix = JOIN_PREFIX;
            } else if (node instanceof Projection) {
                prefix = PROJECTION_PREFIX;
            } else if(node instanceof Extension) {
                prefix = AGGREGATION_PREFIX;
            }  else if (node instanceof Reduced) {
                prefix = CONSTRUCT_PREFIX;
            } else if(node instanceof PeriodicQueryNode) {
                prefix = PERIODIC_QUERY_PREFIX;
            } else {
                throw new IllegalArgumentException("Node must be of type {StatementPattern, Join, Filter, Extension, Projection} but was " + node.getClass());
            }

            final String unique = UUID.randomUUID().toString().replaceAll("-", "");
            // Put them together to create the Node ID.
            return prefix + "_" + unique;
        }

    }

    /**
     * Visits each node of a {@link ParsedQuery} and adds information about
     * the node to a {@link FluoQuery.Builder}. This information is used by the
     * application's observers to incrementally update a PCJ.
     */
    public static class NewQueryVisitor extends AbstractQueryModelVisitor<RuntimeException> {

        private final NodeIds nodeIds;
        private final FluoQuery.Builder fluoQueryBuilder;

        /**
         * Constructs an instance of {@link NewQueryVisitor}.
         *
         * @param fluoQueryBuilder - The builder that will be updated by this
         *   vistior to include metadata about each of the query nodes. (not null)
         * @param nodeIds - The NodeIds object is passed in so that other parts
         *   of the application may look up which ID is associated with each
         *   node of the query.
         */
        public NewQueryVisitor(final FluoQuery.Builder fluoQueryBuilder, final NodeIds nodeIds) {
            this.fluoQueryBuilder = checkNotNull(fluoQueryBuilder);
            this.nodeIds = checkNotNull(nodeIds);
        }

        /**
         * If we encounter an Extension node that contains a Group, then we've found an aggregation.
         */
        @Override
        public void meet(final Extension node) {
            final TupleExpr arg = node.getArg();
            if(arg instanceof Group) {
                final Group group = (Group) arg;

                // Get the Aggregation Node's id.
                final String aggregationId = nodeIds.getOrMakeId(node);

                // Get the group's child node id. This call forces it to be a supported child type.
                final TupleExpr child = group.getArg();
                final String childNodeId = nodeIds.getOrMakeId( child );

                // Get the list of group by binding names.
                VariableOrder groupByVariableOrder = null;
                if(!group.getGroupBindingNames().isEmpty()) {
                    groupByVariableOrder = new VariableOrder(group.getGroupBindingNames());
                } else {
                    groupByVariableOrder = new VariableOrder();
                }


                // The aggregations that need to be performed are the Group Elements.
                final List<AggregationElement> aggregations = new ArrayList<>();
                for(final GroupElem groupElem : group.getGroupElements()) {
                    // Figure out the type of the aggregation.
                    final AggregateOperator operator = groupElem.getOperator();
                    final Optional<AggregationType> type = AggregationType.byOperatorClass( operator.getClass() );

                    // If the type is one we support, create the AggregationElement.
                    if(type.isPresent()) {
                        final String resultBindingName = groupElem.getName();

                        final AtomicReference<String> aggregatedBindingName = new AtomicReference<>();
                        groupElem.visitChildren(new AbstractQueryModelVisitor<RuntimeException>() {
                            @Override
                            public void meet(final Var node) {
                                aggregatedBindingName.set( node.getName() );
                            }
                        });

                        aggregations.add( new AggregationElement(type.get(), aggregatedBindingName.get(), resultBindingName) );
                    }
                }

                // Update the aggregation's metadata.
                AggregationMetadata.Builder aggregationBuilder = fluoQueryBuilder.getAggregateBuilder(aggregationId).orNull();
                if(aggregationBuilder == null) {
                    aggregationBuilder = AggregationMetadata.builder(aggregationId);
                    fluoQueryBuilder.addAggregateMetadata(aggregationBuilder);
                }

                aggregationBuilder.setChildNodeId(childNodeId);
                aggregationBuilder.setGroupByVariableOrder(groupByVariableOrder);

                final Set<String> aggregationVars = getVarsToDelete(groupByVariableOrder.getVariableOrders(), aggregationBuilder.getVariableOrder().getVariableOrders());
                FluoQueryUtils.updateVarOrders(fluoQueryBuilder, UpdateAction.DeleteVariable, Lists.newArrayList(aggregationVars), aggregationId);

                for(final AggregationElement aggregation : aggregations) {
                    aggregationBuilder.addAggregation(aggregation);
                }



                // Update the child node's metadata.
                final Set<String> childVars = getVars(child);
                final VariableOrder childVarOrder = new VariableOrder(childVars);

                setChildMetadata(fluoQueryBuilder, childNodeId, childVarOrder, aggregationId);
            }

            // Walk to the next node.
            super.meet(node);
        }

        @Override
        public void meet(final StatementPattern node) {
            // Extract metadata that will be stored from the node.
            final String spNodeId = nodeIds.getOrMakeId(node);
            final String pattern = FluoStringConverter.toStatementPatternString(node);

            // Get or create a builder for this node populated with the known metadata.
            StatementPatternMetadata.Builder spBuilder = fluoQueryBuilder.getStatementPatternBuilder(spNodeId).orNull();
            if(spBuilder == null) {
                spBuilder = StatementPatternMetadata.builder(spNodeId);
                fluoQueryBuilder.addStatementPatternBuilder(spBuilder);
            }
            spBuilder.setStatementPattern(pattern);
        }

        @Override
        public void meet(final LeftJoin node) {
            // Extract the metadata that will be stored for the node.
            final String leftJoinNodeId = nodeIds.getOrMakeId(node);
            final QueryModelNode left = node.getLeftArg();
            final QueryModelNode right = node.getRightArg();

            // Update the metadata for the JoinMetadata.Builder.
            makeJoinMetadata(leftJoinNodeId, JoinType.LEFT_OUTER_JOIN, left, right);

            // Walk to the next node.
            super.meet(node);
        }

        @Override
        public void meet(final Join node) {
            // Extract the metadata that will be stored from the node.
            final String joinNodeId = nodeIds.getOrMakeId(node);
            final QueryModelNode left = node.getLeftArg();
            final QueryModelNode right = node.getRightArg();

            // Update the metadata for the JoinMetadata.Builder.
            makeJoinMetadata(joinNodeId, JoinType.NATURAL_JOIN, left, right);

            // Walk to the next node.
            super.meet(node);
        }

        private void makeJoinMetadata(final String joinNodeId, final JoinType joinType, final QueryModelNode left, final QueryModelNode right) {
            final String leftChildNodeId = nodeIds.getOrMakeId(left);
            final String rightChildNodeId = nodeIds.getOrMakeId(right);

            // Get or create a builder for this node populated with the known metadata.
            JoinMetadata.Builder joinBuilder = fluoQueryBuilder.getJoinBuilder(joinNodeId).orNull();
            if(joinBuilder == null) {
                joinBuilder = JoinMetadata.builder(joinNodeId);
                fluoQueryBuilder.addJoinMetadata(joinBuilder);
            }
            joinBuilder.setJoinType(joinType);
            joinBuilder.setLeftChildNodeId( leftChildNodeId );
            joinBuilder.setRightChildNodeId( rightChildNodeId );
            if(fluoQueryBuilder.getQueryBuilder().getJoinBatchSize().isPresent()) {
                joinBuilder.setJoinBatchSize(fluoQueryBuilder.getQueryBuilder().getJoinBatchSize().get());
            }

            // Figure out the variable order for each child node's binding set and
            // store it. Also store that each child node's parent is this join.
            final Set<String> leftVars = getVars((TupleExpr)left);
            final Set<String> rightVars = getVars((TupleExpr) right);
            final JoinVarOrders varOrders = getJoinArgVarOrders(leftVars, rightVars);

            // Create or update the left child's variable order and parent node id.
            final VariableOrder leftVarOrder = varOrders.getLeftVarOrder();
            setChildMetadata(fluoQueryBuilder, leftChildNodeId, leftVarOrder, joinNodeId);

            // Create or update the right child's variable order and parent node id.
            final VariableOrder rightVarOrder = varOrders.getRightVarOrder();
            setChildMetadata(fluoQueryBuilder, rightChildNodeId, rightVarOrder, joinNodeId);
        }

        @Override
        public void meet(final Filter node) {

            // Get or create a builder for this node populated with the known metadata.
            final String filterId = nodeIds.getOrMakeId(node);

            FilterMetadata.Builder filterBuilder = fluoQueryBuilder.getFilterBuilder(filterId).orNull();
            if(filterBuilder == null) {
                filterBuilder = FilterMetadata.builder(filterId);
                fluoQueryBuilder.addFilterMetadata(filterBuilder);
            }

            String filterString;
            try {
                filterString = FilterSerializer.serialize(node);
            } catch (final FilterParseException e) {
                throw new RuntimeException(e);
            }
            filterBuilder.setFilterSparql(filterString);

            final QueryModelNode child = node.getArg();
            if(child == null) {
                throw new IllegalArgumentException("Filter arg connot be null.");
            }

            final String childNodeId = nodeIds.getOrMakeId(child);
            filterBuilder.setChildNodeId(childNodeId);

            // Update the child node's metadata.
            final Set<String> childVars = getVars((TupleExpr)child);
            final VariableOrder childVarOrder = new VariableOrder(childVars);
            setChildMetadata(fluoQueryBuilder, childNodeId, childVarOrder, filterId);

            // Walk to the next node.
            super.meet(node);
        }

        @Override
        public void meetOther(final QueryModelNode qNode) {
            if (qNode instanceof PeriodicQueryNode) {
                final PeriodicQueryNode node = (PeriodicQueryNode) qNode;
                // Get or create a builder for this node populated with the
                // known metadata.
                final String periodicId = nodeIds.getOrMakeId(node);

                PeriodicQueryMetadata.Builder periodicBuilder = fluoQueryBuilder.getPeriodicQueryBuilder().orNull();
                if (periodicBuilder == null) {
                    periodicBuilder = PeriodicQueryMetadata.builder();
                    periodicBuilder.setNodeId(periodicId);
                    fluoQueryBuilder.addPeriodicQueryMetadata(periodicBuilder);
                }
                periodicBuilder.setWindowSize(node.getWindowSize());
                periodicBuilder.setPeriod(node.getPeriod());
                periodicBuilder.setTemporalVariable(node.getTemporalVariable());
                periodicBuilder.setUnit(node.getUnit());

                final QueryModelNode child = node.getArg();
                if (child == null) {
                    throw new IllegalArgumentException("PeriodicQueryNode child arg connot be null.");
                }

                final String childNodeId = nodeIds.getOrMakeId(child);
                periodicBuilder.setChildNodeId(childNodeId);

                // Update the child node's metadata.
                final Set<String> childVars = getVars((TupleExpr) child);
                final VariableOrder childVarOrder = new VariableOrder(childVars);
                setChildMetadata(fluoQueryBuilder, childNodeId, childVarOrder, periodicId);

                // update variable order of this node and all ancestors to
                // include BIN_ID binding as
                // first variable in the ordering
                FluoQueryUtils.updateVarOrders(fluoQueryBuilder, UpdateAction.AddVariable,
                        Collections.singletonList(IncrementalUpdateConstants.PERIODIC_BIN_ID), periodicId);
                // Walk to the next node.
                node.getArg().visit(this);
            }
        }


        @Override
        public void meet(final Projection node) {
            // Create a builder for this node populated with the metadata.
            final String queryId = nodeIds.getOrMakeId(node);

            ProjectionMetadata.Builder projectionBuilder = fluoQueryBuilder.getProjectionBuilder(queryId).orNull();
            if (projectionBuilder == null) {
                projectionBuilder = ProjectionMetadata.builder(queryId);
                fluoQueryBuilder.addProjectionBuilder(projectionBuilder);
            }

            final QueryModelNode child = node.getArg();
            if(child == null) {
                throw new IllegalArgumentException("Projection arg connot be null.");
            }

            final String childNodeId = nodeIds.getOrMakeId(child);
            projectionBuilder.setChildNodeId(childNodeId);
            projectionBuilder.setProjectedVars(projectionBuilder.getVariableOrder());

            // Update the child node's metadata.
            final Set<String> childVars = getVars((TupleExpr)child);
            final VariableOrder childVarOrder = new VariableOrder(childVars);

            setChildMetadata(fluoQueryBuilder, childNodeId, childVarOrder, queryId);

            // Walk to the next node.
            super.meet(node);
        }


        @Override
        public void meet(final Reduced node) {
            //create id, initialize ConstructQueryMetadata builder, register ConstructQueryMetadata
            //builder with FluoQueryBuilder, and add metadata that we currently have
            final String constructId = nodeIds.getOrMakeId(node);

            ConstructQueryMetadata.Builder constructBuilder = fluoQueryBuilder.getConstructQueryBuilder().orNull();
            if(constructBuilder == null) {
                constructBuilder = ConstructQueryMetadata.builder();
                constructBuilder.setNodeId(constructId);
                fluoQueryBuilder.setConstructQueryMetadata(constructBuilder);
            }

            //get child node
            QueryModelNode child = node.getArg();
            Preconditions.checkArgument(child instanceof Projection || child instanceof MultiProjection);
            final UnaryTupleOperator unary = (UnaryTupleOperator) child;

            //get ProjectionElemList to build ConstructGraph
            final List<ProjectionElemList> projections = new ArrayList<>();
            if(unary instanceof Projection) {
                projections.add(((Projection) unary).getProjectionElemList());
            } else {
                projections.addAll(((MultiProjection)unary).getProjections());
            }

            //get ExtensionElems to build ConstructGraph
            final QueryModelNode grandChild = unary.getArg();
            Preconditions.checkArgument(grandChild instanceof Extension);
            final Extension extension = (Extension) grandChild;
            final List<ExtensionElem> extensionElems = extension.getElements();
            final ConstructGraph graph = getConstructGraph(projections, extensionElems);
            constructBuilder.setConstructGraph(graph);

            //set child to the next node we care about in Fluo
            //if Extension's arg is a Group node, then it is an Aggregation, so set child to Extension
            //otherwise set child to Extension's child (only care about Extensions if they are Aggregations)
            if(extension.getArg() instanceof Group) {
                child = extension;
            } else {
                child = extension.getArg();
            }

            //Set the child node in the ConstructQueryMetadataBuilder
            final String childNodeId = nodeIds.getOrMakeId(child);
            constructBuilder.setChildNodeId(childNodeId);

            // Update the child node's metadata.
            final Set<String> childVars = getVars((TupleExpr)child);
            final VariableOrder childVarOrder = new VariableOrder(childVars);
            setChildMetadata(fluoQueryBuilder, childNodeId, childVarOrder, constructId);

            //fast forward visitor to next node we care about
            child.visit(this);
        }

        private ConstructGraph getConstructGraph(final List<ProjectionElemList> projections, final List<ExtensionElem> extensionElems) {
            final Map<String, Value> valueMap = new HashMap<>();
            //create valueMap to associate source names with Values
            for(final ExtensionElem elem: extensionElems) {
                final String name = elem.getName();
                final ValueExpr expr = elem.getExpr();
                if(expr instanceof ValueConstant) {
                    final Value value = ((ValueConstant) expr).getValue();
                    valueMap.put(name, value);
                } else if(expr instanceof BNodeGenerator) {
                    valueMap.put(name, VF.createBNode(UUID.randomUUID().toString()));
                }
            }

            final Set<ConstructProjection> constructProj = new HashSet<>();
            //build ConstructProjection for each ProjectionElemList
            for(final ProjectionElemList list: projections) {
                validateProjectionElemList(list);
                final List<Var> vars = new ArrayList<>();
                for(final ProjectionElem elem: list.getElements()) {
                    final String sourceName = elem.getSourceName();
                    final Var var = new Var(sourceName);
                    if(valueMap.containsKey(sourceName)) {
                        var.setValue(valueMap.get(sourceName));
                    }
                    vars.add(var);
                }
                constructProj.add(new ConstructProjection(vars.get(0), vars.get(1), vars.get(2)));
            }

            return new ConstructGraph(constructProj);
        }

        private Set<String> getVarsToDelete(final Collection<String> groupByVars, final Collection<String> varOrderVars) {
            final Set<String> groupBySet = Sets.newHashSet(groupByVars);
            final Set<String> varOrderSet = Sets.newHashSet(varOrderVars);

            return Sets.difference(varOrderSet, groupBySet);
        }

        private void validateProjectionElemList(final ProjectionElemList list) {
            final List<ProjectionElem> elements = list.getElements();
            checkArgument(elements.size() == 3);
            checkArgument(elements.get(0).getTargetName().equals("subject"));
            checkArgument(elements.get(1).getTargetName().equals("predicate"));
            checkArgument(elements.get(2).getTargetName().equals("object"));
        }

        /**
         * Get the non-constant variables from a {@link TupleExpr}.
         *
         * @param node - The node to inspect for variables. (not null)
         * @return The non-constant variables that were part of the node.
         */
        private Set<String> getVars(final TupleExpr node) {
            checkNotNull(node);

            final Set<String> vars = Sets.newHashSet();

            for(final String bindingName : node.getBindingNames()) {
                if (!VarNameUtils.isConstant(bindingName)) {
                    vars.add(bindingName);
                }
            }

            return vars;
        }

        /**
         * Holds the Variable Order of the binding sets for the children of a join node.
         */
        @Immutable
        @DefaultAnnotation(NonNull.class)
        private static final class JoinVarOrders {
            private final VariableOrder leftVarOrder;
            private final VariableOrder rightVarOrder;

            /**
             * Constructs an instance of {@link }.
             *
             * @param leftVarOrder - The left child's Variable Order. (not null)
             * @param rightVarOrder - The right child's Variable Order. (not null)
             */
            public JoinVarOrders(final VariableOrder leftVarOrder, final VariableOrder rightVarOrder) {
                this.leftVarOrder = checkNotNull(leftVarOrder);
                this.rightVarOrder = checkNotNull(rightVarOrder);
            }

            /**
             * @return The left child's Variable Order.
             */
            public VariableOrder getLeftVarOrder() {
                return leftVarOrder;
            }

            /**
             * @return The right child's Variable Order.
             */
            public VariableOrder getRightVarOrder() {
                return rightVarOrder;
            }
        }

        /**
         * Shifts the common variables between the two children to the left so
         * that Accumulo scans when performing the join are efficient.
         *
         * @param leftVars - The left child's variables. (not null)
         * @param rightVars - The right child's variables. (not null)
         * @return An object holding the left and right children's variable orders.
         */
        private JoinVarOrders getJoinArgVarOrders(final Set<String> leftVars, final Set<String> rightVars) {
            checkNotNull(leftVars);
            checkNotNull(rightVars);

            // Find the common variables between the left and right children. The common vars
            // are stored in a list to ensure iteration order is always the same.
            final List<String> commonVars = new ArrayList<>( Sets.intersection(leftVars, rightVars) );

            // Push all of the common variables to the left for each child's vars.
            final List<String> leftVarOrder = leftShiftCommonVars(commonVars, leftVars);
            final List<String> rightVarOrder = leftShiftCommonVars(commonVars, rightVars);
            return new JoinVarOrders(new VariableOrder(leftVarOrder), new VariableOrder(rightVarOrder));
        }

        /**
         * Orders the set of common variables so that all of the common ones
         * are on the left in the same order they have been provided. The rest
         * of the variables are iterated over and added to the end of the list
         * in no particular order.
         *
         * @param commonVars - An ordered list of variables that must appear on the left. (not null)
         * @param allVars - The variables that need to be ordered. (not null)
         * @return A list of variables ordered as described above.
         */
        private List<String> leftShiftCommonVars(final List<String> commonVars, final Collection<String> allVars) {
            checkNotNull(commonVars);
            checkNotNull(allVars);

            final List<String> shifted = Lists.newArrayList(commonVars);
            for(final String var : allVars) {
                if(!shifted.contains(var)) {
                    shifted.add(var);
                }
            }
            return shifted;
        }
    }

    private void setVarOrderAndQueryType(final QueryMetadata.Builder builder, final TupleExpr te) {
        final QueryMetadataLocator locator = new QueryMetadataLocator();
        try {
            te.visit(locator);
        } catch (final Exception e) {
            throw new RuntimeException(e);
        }

        builder.setVarOrder(locator.getVarOrder());
        builder.setQueryType(locator.getQueryType());
    }

    public static class QueryMetadataLocator extends AbstractQueryModelVisitor<Exception> {

        private VariableOrder varOrder;
        private QueryType queryType;

        public VariableOrder getVarOrder() {
            return varOrder;
        }

        public QueryType getQueryType() {
            return queryType;
        }

        @Override
        public void meet(final Projection node) throws Exception {
            final Set<String> bindingNames = node.getBindingNames();
            if(varOrder == null) {
                varOrder = new VariableOrder(bindingNames);
            }

            if(queryType == null) {
                queryType = QueryType.PROJECTION;
            }
            super.meet(node);
        }

        @Override
        public void meet(final Reduced node) throws Exception {
            if(varOrder == null) {
                varOrder = getConstructGraphVarOrder(node);
            }

            if(queryType == null) {
                queryType = QueryType.CONSTRUCT;
            }
            super.meet(node);
        }

        @Override
        public void meetOther(final QueryModelNode node) throws Exception {
            if (node instanceof PeriodicQueryNode) {
                queryType = QueryType.PERIODIC;
            } else {
                super.meetOther(node);
            }
        }
    }

    private static VariableOrder getConstructGraphVarOrder(final Reduced node) {

        //get child node
          final QueryModelNode child = node.getArg();
          Preconditions.checkArgument(child instanceof Projection || child instanceof MultiProjection);
          final UnaryTupleOperator unary = (UnaryTupleOperator) child;

          //get ProjectionElemList to build ConstructGraph
          final List<ProjectionElemList> projections = new ArrayList<>();
          if(unary instanceof Projection) {
              projections.add(((Projection) unary).getProjectionElemList());
          } else {
              projections.addAll(((MultiProjection)unary).getProjections());
          }

          return getConstructGraphVarOrder(projections);
      }

    private static VariableOrder getConstructGraphVarOrder(List<ProjectionElemList> projections) {
        final Set<String> varOrders = new HashSet<>();

        for(final ProjectionElemList elems: projections) {
            for(final ProjectionElem elem: elems.getElements()) {
                final String name = elem.getSourceName();
                if (!VarNameUtils.isConstant(name) && !VarNameUtils.isAnonymous(name)) {
                    varOrders.add(name);
                }
            }
        }

        return new VariableOrder(varOrders);
    }


    /**
     * Update a query node's metadata to include it's binding set variable order
     * and it's parent node id. This information is only known when handling
     * the parent node.
     *
     * @param fluoQueryBuilder - Builder whose metadata is updatad
     * @param childNodeId - The node ID of the child node.
     * @param childVarOrder - The variable order of the child node's binding sets.
     * @param parentNodeId - The node ID that consumes the child's binding sets.
     */
    private static void setChildMetadata(final FluoQuery.Builder fluoQueryBuilder, final String childNodeId, final VariableOrder childVarOrder, final String parentNodeId) {
        checkNotNull(childNodeId);
        checkNotNull(childVarOrder);
        checkNotNull(parentNodeId);

        final NodeType childType = NodeType.fromNodeId(childNodeId).get();
        switch (childType) {
        case STATEMENT_PATTERN:
            StatementPatternMetadata.Builder spBuilder = fluoQueryBuilder.getStatementPatternBuilder(childNodeId).orNull();
            if (spBuilder == null) {
                spBuilder = StatementPatternMetadata.builder(childNodeId);
                fluoQueryBuilder.addStatementPatternBuilder(spBuilder);
            }

            spBuilder.setVarOrder(childVarOrder);
            spBuilder.setParentNodeId(parentNodeId);
            break;

        case JOIN:
            JoinMetadata.Builder joinBuilder = fluoQueryBuilder.getJoinBuilder(childNodeId).orNull();
            if (joinBuilder == null) {
                joinBuilder = JoinMetadata.builder(childNodeId);
                fluoQueryBuilder.addJoinMetadata(joinBuilder);
            }

            joinBuilder.setVarOrder(childVarOrder);
            joinBuilder.setParentNodeId(parentNodeId);
            break;

        case FILTER:
            FilterMetadata.Builder filterBuilder = fluoQueryBuilder.getFilterBuilder(childNodeId).orNull();
            if (filterBuilder == null) {
                filterBuilder = FilterMetadata.builder(childNodeId);
                fluoQueryBuilder.addFilterMetadata(filterBuilder);
            }

            filterBuilder.setVarOrder(childVarOrder);
            filterBuilder.setParentNodeId(parentNodeId);
            break;

        case AGGREGATION:
            AggregationMetadata.Builder aggregationBuilder = fluoQueryBuilder.getAggregateBuilder(childNodeId).orNull();
            if (aggregationBuilder == null) {
                aggregationBuilder = AggregationMetadata.builder(childNodeId);
                fluoQueryBuilder.addAggregateMetadata(aggregationBuilder);
            }

            aggregationBuilder.setVarOrder(childVarOrder);
            aggregationBuilder.setParentNodeId(parentNodeId);
            break;

        case PROJECTION:
            ProjectionMetadata.Builder projectionBuilder = fluoQueryBuilder.getProjectionBuilder(childNodeId).orNull();
            if(projectionBuilder == null) {
                projectionBuilder = ProjectionMetadata.builder(childNodeId);
                fluoQueryBuilder.addProjectionBuilder(projectionBuilder);
            }

            projectionBuilder.setVarOrder(childVarOrder);
            projectionBuilder.setParentNodeId(parentNodeId);
            break;

        case QUERY:
            throw new IllegalArgumentException("A QUERY node cannot be the child of another node.");

        case CONSTRUCT:
            ConstructQueryMetadata.Builder constructBuilder = fluoQueryBuilder.getConstructQueryBuilder().orNull();
            if(constructBuilder == null) {
                constructBuilder = ConstructQueryMetadata.builder();
                constructBuilder.setNodeId(childNodeId);
                fluoQueryBuilder.setConstructQueryMetadata(constructBuilder);
            }

            Preconditions.checkArgument(childNodeId.equals(constructBuilder.getNodeId()));
            constructBuilder.setVarOrder(childVarOrder);
            constructBuilder.setParentNodeId(parentNodeId);
            break;

        case PERIODIC_QUERY:
            PeriodicQueryMetadata.Builder periodicQueryBuilder = fluoQueryBuilder.getPeriodicQueryBuilder().orNull();
            if (periodicQueryBuilder == null) {
                periodicQueryBuilder = PeriodicQueryMetadata.builder();
                periodicQueryBuilder.setNodeId(childNodeId);
                fluoQueryBuilder.addPeriodicQueryMetadata(periodicQueryBuilder);
            }
            periodicQueryBuilder.setVarOrder(childVarOrder);
            periodicQueryBuilder.setParentNodeId(parentNodeId);
            break;

        default:
            throw new IllegalArgumentException("Unsupported NodeType: " + childType);
        }
    }
}