/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.core.optimizer.costs;

import java.util.Arrays;
import java.util.function.ToDoubleBiFunction;
import java.util.function.ToLongBiFunction;
import java.util.stream.LongStream;
import org.apache.commons.lang3.Validate;
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;

/**
 * Implementation of {@link LoadEstimator} that uses a interval-based cost function.
 */
public class IntervalLoadEstimator extends LoadEstimator {

    private final double correctnessProbablity;

    private final int numInputs, numOutputs;

    private final ToLongBiFunction<long[], long[]> lowerBoundEstimator, upperBoundEstimator;

    public IntervalLoadEstimator(int numInputs,
                                 int numOutputs,
                                 double correctnessProbability,
                                 ToLongBiFunction<long[], long[]> lowerBoundEstimator,
                                 ToLongBiFunction<long[], long[]> upperBoundEstimator) {
        this(numInputs, numOutputs, correctnessProbability, null, lowerBoundEstimator, upperBoundEstimator);
    }

    public IntervalLoadEstimator(int numInputs,
                                 int numOutputs,
                                 double correctnessProbablity,
                                 CardinalityEstimate nullCardinalityReplacement,
                                 ToLongBiFunction<long[], long[]> lowerBoundEstimator,
                                 ToLongBiFunction<long[], long[]> upperBoundEstimator) {

        super(nullCardinalityReplacement);
        this.numInputs = numInputs;
        this.numOutputs = numOutputs;
        this.correctnessProbablity = correctnessProbablity;
        this.lowerBoundEstimator = lowerBoundEstimator;
        this.upperBoundEstimator = upperBoundEstimator;
    }

    /**
     * Utility to create new instances: Rounds the results of a given estimation function.
     */
    @SuppressWarnings("unused")
    public static ToLongBiFunction<long[], long[]> rounded(ToDoubleBiFunction<long[], long[]> f) {
        return (inputCards, outputCards) -> Math.round(f.applyAsDouble(inputCards, outputCards));
    }

    /**
     * Create a {@link LoadEstimator} that accounts a given load for each input and output element.
     *
     * @param operator                    an {@link ExecutionOperator} being addressed by the new instance
     * @param lowerLoadPerCardinalityUnit lower expected load units per input and output data quantum
     * @param upperLoadPerCardinalityUnit upper expected load units per input and output data quantum
     * @param confidence                  confidence in the new instance
     * @param nullCardinalityReplacement  replacement for {@code null}s as {@link CardinalityEstimate}s
     */
    public static <T extends ExecutionOperator> LoadEstimator createIOLinearEstimator(
            T operator,
            long lowerLoadPerCardinalityUnit,
            long upperLoadPerCardinalityUnit,
            double confidence,
            CardinalityEstimate nullCardinalityReplacement) {
        return new IntervalLoadEstimator(
                operator == null ? UNSPECIFIED_NUM_SLOTS : operator.getNumInputs(),
                operator == null ? UNSPECIFIED_NUM_SLOTS : operator.getNumOutputs(),
                confidence,
                nullCardinalityReplacement,
                (inputCards, outputCards) ->
                        lowerLoadPerCardinalityUnit * LongStream.concat(
                                Arrays.stream(inputCards),
                                Arrays.stream(outputCards)
                        ).sum(),
                (inputCards, outputCards) ->
                        upperLoadPerCardinalityUnit * LongStream.concat(
                                Arrays.stream(inputCards),
                                Arrays.stream(outputCards)
                        ).sum()
        );
    }

    @Override
    public LoadEstimate calculate(EstimationContext context) {
        final CardinalityEstimate[] inputEstimates = context.getInputCardinalities();
        final CardinalityEstimate[] outputEstimates = context.getOutputCardinalities();
        Validate.isTrue(inputEstimates.length >= this.numInputs || this.numInputs == UNSPECIFIED_NUM_SLOTS,
                "Received %d input estimates, require %d.", inputEstimates.length, this.numInputs);
        Validate.isTrue(outputEstimates.length == this.numOutputs || this.numOutputs == UNSPECIFIED_NUM_SLOTS,
                "Received %d output estimates, require %d.", outputEstimates.length, this.numOutputs);

        long[][] inputEstimateCombinations = this.enumerateCombinations(inputEstimates);
        long[][] outputEstimateCombinations = this.enumerateCombinations(outputEstimates);

        long lowerEstimate = -1, upperEstimate = -1;
        for (int inputEstimateId = 0; inputEstimateId < inputEstimateCombinations.length; inputEstimateId++) {
            for (int outputEstimateId = 0; outputEstimateId < outputEstimateCombinations.length; outputEstimateId++) {
                long estimate = Math.max(this.lowerBoundEstimator.applyAsLong(
                        inputEstimateCombinations[inputEstimateId],
                        outputEstimateCombinations[outputEstimateId]
                ), 0);
                if (lowerEstimate == -1 || estimate < lowerEstimate) {
                    lowerEstimate = estimate;
                }
                if (upperEstimate == -1 || estimate > upperEstimate) {
                    upperEstimate = estimate;
                }
                estimate = Math.max(this.upperBoundEstimator.applyAsLong(
                        inputEstimateCombinations[inputEstimateId],
                        outputEstimateCombinations[outputEstimateId]
                ), 0);
                if (lowerEstimate == -1 || estimate < lowerEstimate) {
                    lowerEstimate = estimate;
                }
                if (upperEstimate == -1 || estimate > upperEstimate) {
                    upperEstimate = estimate;
                }
            }
        }

        double correctnessProbability = this.calculateJointProbability(inputEstimates, outputEstimates)
                * this.correctnessProbablity;
        return new LoadEstimate(lowerEstimate, upperEstimate, correctnessProbability);
    }


}
