/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.core.plan.wayangplan.traversal;

import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.wayang.core.api.exception.WayangException;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator;
import org.apache.wayang.core.plan.wayangplan.Operator;
import org.apache.wayang.core.plan.wayangplan.WayangPlan;

/**
 * Encapsulates logic to traverse a {@link WayangPlan} in a topological, bottom-up manner.
 * <p>In a topological traversal, before a node of a DAG is visited, all it predecessors are visited. Moreover,
 * every node is visited only once. Finally, the nodes can propagate information from predecessor to successor.</p>
 */
public abstract class AbstractTopologicalTraversal<
        ActivatorType extends AbstractTopologicalTraversal.Activator<ActivationType>,
        ActivationType extends AbstractTopologicalTraversal.Activation<ActivatorType>
        > {

    protected final Logger logger = LogManager.getLogger(this.getClass());

    /**
     * Perform a traversal starting from source {@link Operator}s and initially activated {@link Operator}s.
     *
     * @return whether the traversal was <i>not</i> aborted
     */
    public final boolean traverse() {
        try {
            final Queue<ActivatorType> activators = this.initializeActivatorQueue();
            if (activators.isEmpty()) {
                throw new AbortException("No activators available.");
            }
            do {
                final ActivatorType activator = activators.poll();
                // Without this double-cast, we run into a compiler bug: https://bugs.openjdk.java.net/browse/JDK-8131744
                activator.process((Queue<Activator<ActivationType>>) (Queue) activators);
            } while (!activators.isEmpty());
        } catch (AbortException e) {
            this.logger.debug("Traversal aborted: {}", e.getMessage());
            return false;
        }
        return true;
    }

    /**
     * Set up a queue of initial {@link Activator}s for an estimation pass.
     */
    private final Queue<ActivatorType> initializeActivatorQueue() {
        // Set up the initial Activators.
        Queue<ActivatorType> activatorQueue = new LinkedList<>(this.getInitialActivators());

        // Fire Activations satisfied from the payloads.
        for (int i = 0; i < this.getNumInitialActivations(); i++) {
            final Collection<ActivationType> activations = this.getInitialActivations(i);
            for (ActivationType activation : activations) {
                ActivatorType activator = activation.getTargetActivator();
                activator.accept(activation);
                if (activator.isActivationComplete()) {
                    activatorQueue.add(activator);
                }
            }
        }
        return activatorQueue;
    }

    protected abstract Collection<ActivatorType> getInitialActivators();

    protected abstract Collection<ActivationType> getInitialActivations(int index);

    protected abstract int getNumInitialActivations();

    /**
     * Wraps a {@link CardinalityEstimator}, thereby caching its input {@link CardinalityEstimate}s and keeping track
     * of its dependent {@link CardinalityEstimator}s.
     */
    public static abstract class Activator<TActivation extends Activation<? extends Activator<TActivation>>> {

        protected final Operator operator;

        protected Activator(Operator operator) {
            this.operator = operator;
        }

        protected abstract boolean isActivationComplete();

        /**
         * Execute this instance, thereby activating new instances and putting them on the queue.
         *
         * @param activatorQueue accepts newly activated {@link CardinalityEstimator}s
         */
        protected void process(Queue<Activator<TActivation>> activatorQueue) {
            assert this.isActivationComplete() : String.format("Cannot process %s: activation not complete.", this);
            Collection<TActivation> successorActivations = this.doWork();
            if (successorActivations == null) {
                throw new AbortException(String.format("%s requested to abort.", this));
            }

            for (TActivation activation : successorActivations) {
                final Activator<TActivation> activator = activation.getTargetActivator();
                activator.accept(activation);
                if (activator.isActivationComplete()) {
                    activatorQueue.add(activator);
                }
            }
        }

        /**
         * Performs the work to be done by this instance and defines the next {@link Activation}s.
         *
         * @return the newly produced {@link Activation}s or {@code null} if traversal should be aborted
         */
        protected abstract Collection<TActivation> doWork();

        protected abstract void accept(TActivation activation);

        @Override
        public String toString() {
            return String.format("%s[%s]", this.getClass().getSimpleName(), this.operator);
        }
    }

    /**
     * Describes a reference to an input of an {@link Activator}.
     */
    public abstract static class Activation<TActivator extends Activator<? extends Activation<TActivator>>> {

        private final TActivator targetActivator;

        protected Activation(TActivator targetActivator) {
            this.targetActivator = targetActivator;
        }

        protected TActivator getTargetActivator() {
            return this.targetActivator;
        }

    }

    /**
     * Declares that the current traversal should be aborted.
     */
    public static class AbortException extends WayangException {

        public AbortException(String message) {
            super(message);
        }

    }

}
