/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.core.mapping;

import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.commons.lang3.Validate;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.plan.wayangplan.InputSlot;
import org.apache.wayang.core.plan.wayangplan.Operator;
import org.apache.wayang.core.plan.wayangplan.OperatorBase;
import org.apache.wayang.core.plan.wayangplan.OutputSlot;
import org.apache.wayang.core.plan.wayangplan.PlanTraversal;
import org.apache.wayang.core.plan.wayangplan.TopDownPlanVisitor;
import org.apache.wayang.core.plan.wayangplan.WayangPlan;

/**
 * A subplan pattern describes a class of subplans in a {@link WayangPlan}.
 * <p><i>NB: Currently, only such patterns are tested and supported that form a chain of operators, i.e., no DAGs
 * are allowed and at most one input and one output operator.</i></p>
 */
public class SubplanPattern extends OperatorBase {

    /**
     * Start and end {@link OperatorPattern} of this instance.
     */
    private final OperatorPattern inputPattern, outputPattern;

    /**
     * Creates a new instance that matches only a single operator.
     *
     * @param operatorPattern the only operator pattern
     * @return the new instance
     */
    public static final SubplanPattern createSingleton(OperatorPattern operatorPattern) {
        return fromOperatorPatterns(operatorPattern, operatorPattern);
    }

    /**
     * Creates a new instance that matches a graph of operator patterns.
     *
     * @param inputOperatorPattern  the only operator pattern that has inputs wrt. the subplan pattern
     * @param outputOperatorPattern the only operator pattern that has outputs wrt. the subplan pattern
     * @return the new instance
     */
    public static final SubplanPattern fromOperatorPatterns(OperatorPattern inputOperatorPattern,
                                                            OperatorPattern outputOperatorPattern) {
        return new SubplanPattern(inputOperatorPattern, outputOperatorPattern);
    }

    /**
     * Creates a new instance.
     */
    private SubplanPattern(OperatorPattern inputPattern, OperatorPattern outputPattern) {
        super(inputPattern.getAllInputs(), outputPattern.getAllOutputs(), false);
        this.inputPattern = inputPattern;
        this.outputPattern = outputPattern;
    }

    /**
     * Match this pattern against a plan.
     *
     * @param plan     the plan to match against
     * @param minEpoch the (inclusive) minimum epoch value for matched subplans
     * @return all matches
     */
    public List<SubplanMatch> match(WayangPlan plan, int minEpoch) {
        return new Matcher(minEpoch).match(plan);
    }

    public OperatorPattern getInputPattern() {
        return this.inputPattern;
    }

    public OperatorPattern getOutputPattern() {
        return this.outputPattern;
    }

    @Override
    public <Payload, Return> Return accept(TopDownPlanVisitor<Payload, Return> visitor, OutputSlot<?> outputSlot, Payload payload) {
        throw new RuntimeException("Pattern does not accept visitors.");
    }

    /**
     * This class encapsulates the functionality to match a subplan pattern against a plan. It works as follows:
     * <ol>
     * <li>for each operator in the plan, start a new match attempt</li>
     * <li>iterate through the graph pattern</li>
     * <li>co-iterate the actual graph</li>
     * <li>if we could co-iterate the two, create a match for the match attempt</li>
     * </ol>
     */
    private class Matcher {

        /**
         * Operators that have been considered to match against this pattern's sink.
         */
        final private Set<Operator> visitedOutputOperators = new HashSet<>();

        /**
         * Collects all the matches.
         */
        final private List<SubplanMatch> matches = new LinkedList<>();

        /**
         * @param minEpoch the (inclusive) minimum epoch value for matched subplans
         */
        private final int minEpoch;

        /**
         * Creates a new instance.
         *
         * @param minEpoch see {@link #minEpoch}
         */
        public Matcher(int minEpoch) {
            this.minEpoch = minEpoch;
        }

        /**
         * Run this instance over the given {@link WayangPlan}.
         *
         * @return a {@link List} of all {@link SubplanMatch}es established by the run
         */
        public List<SubplanMatch> match(WayangPlan plan) {
            // Start an attempt to match from each operator that is upstream-reachable from one of the WayangPlan sinks.
            PlanTraversal.upstream().traversingHierarchically()
                    .withCallback(this::attemptMatchFrom)
                    .traverse(plan.getSinks());
            return this.matches;
        }

        /**
         * Try to match the given operator pattern..
         *
         * @param operator the operator that should be matched with the operator pattern
         */
        private void attemptMatchFrom(Operator operator, InputSlot<?> fromInputSlot, OutputSlot<?> fromOutputSlot) {
            Validate.isTrue(fromInputSlot == null, "Cannot handle downstream traversals.");

            // Try to make a match starting from the currently visited operator.
            final SubplanMatch subplanMatch = new SubplanMatch(SubplanPattern.this);
            this.match(SubplanPattern.this.outputPattern, operator, fromOutputSlot, subplanMatch);
        }

        /**
         * Recursively match the given operator pattern and operator including their input operators.
         *
         * @param pattern           the next {@link OperatorPattern} to match with
         * @param operator          the {@link Operator} that should match with the {@code pattern}
         * @param trackedOutputSlot the {@link OutputSlot} of the {@link Operator} that we are coming from
         * @param subplanMatch      collects the {@link OperatorMatch}es on success
         */
        private void match(OperatorPattern pattern,
                           Operator operator,
                           OutputSlot<?> trackedOutputSlot,
                           SubplanMatch subplanMatch) {

            // Make sure that nobody expects more from this instance than it can handle.
            if (pattern.getNumInputs() > 1 &&
                    Arrays.stream(pattern.getAllInputs())
                            .map(InputSlot::getOccupant)
                            .filter(Objects::nonNull)
                            .count() > 1) {
                throw new WayangException("Cannot match pattern: Operator with more than one occupied input not supported, yet.");
            }


            // We expect a regular Operator here.
            // Try to match the co-iterated operator (pattern).
            assert operator.isElementary();
            final OperatorMatch operatorMatch = pattern.match(operator);
            if (operatorMatch == null) {
                // If match was not successful, abort. NB: This might change if we have, like, real graph patterns.
                return;
            }

            subplanMatch.addOperatorMatch(operatorMatch);

            // Now we need to go further upstream and try to match all the input OperatorPatterns.
            // NB: As of now, that should be exactly one (see top).
            boolean hasInputOperatorPatterns = false;
            for (int inputIndex = 0; inputIndex < pattern.getNumInputs(); inputIndex++) {
                final OutputSlot<?> patternOccupant = pattern.getEffectiveOccupant(pattern.getInput(inputIndex));
                if (patternOccupant == null) {
                    continue;
                }
                final OperatorPattern inputOperatorPattern = (OperatorPattern) patternOccupant.getOwner();

                hasInputOperatorPatterns = true;
                final InputSlot<?> outerInputSlot = operator.getOutermostInputSlot(operator.getInput(inputIndex));
                final OutputSlot<?> occupant = outerInputSlot.getOccupant();
                if (occupant != null) {
                    this.match(inputOperatorPattern, occupant.getOwner(), occupant, subplanMatch);
                }

            }

            if (!hasInputOperatorPatterns) {
                if (subplanMatch.getMaximumEpoch() >= this.minEpoch) {
                    this.matches.add(subplanMatch);
                }
            }
        }
    }
}
