/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.profiler.java;


import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.commons.util.profiledb.instrumentation.StopWatch;
import org.apache.wayang.commons.util.profiledb.model.Experiment;
import org.apache.wayang.commons.util.profiledb.model.Subject;
import org.apache.wayang.commons.util.profiledb.model.measurement.TimeMeasurement;
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator;
import org.apache.wayang.core.util.WayangCollections;
import org.apache.wayang.java.operators.JavaExecutionOperator;
import org.apache.wayang.profiler.data.DataGenerators;
import org.apache.wayang.profiler.util.ProfilingUtils;

/**
 * Utility to support finding reasonable {@link LoadProfileEstimator}s for {@link JavaExecutionOperator}s.
 */
public class Profiler {

    private static final int GC_RUNS = 1;

    public static void main(String[] args) {
        if (args.length == 0) {
            System.err.printf("Usage: java %s <operator to profile> <cardinality>[,<cardinality>]\n", Profiler.class);
            System.exit(1);
        }

        String operator = args[0];
        List<Integer> cardinalities = Arrays.stream(args[1].split(",")).map(Integer::valueOf).collect(Collectors.toList());
        List<OperatorProfiler.Result> results;

        switch (operator) {
            case "textsource":
                results = profile(OperatorProfilers.createJavaTextFileSourceProfiler(), cardinalities);
                break;
            case "collectionsource":
                results = profile(OperatorProfilers.createJavaCollectionSourceProfiler(), cardinalities);
                break;
            case "map":
                results = profile(OperatorProfilers.createJavaMapProfiler(), cardinalities);
                break;
            case "filter":
                results = profile(OperatorProfilers.createJavaFilterProfiler(), cardinalities);
                break;
            case "flatmap":
                results = profile(OperatorProfilers.createJavaFlatMapProfiler(), cardinalities);
                break;
            case "reduce":
                results = profile(OperatorProfilers.createJavaReduceByProfiler(), cardinalities);
                break;
            case "globalreduce":
                results = profile(OperatorProfilers.createJavaGlobalReduceProfiler(), cardinalities);
                break;
            case "distinct":
            case "distinct-string":
                results = profile(OperatorProfilers.createJavaDistinctProfiler(), cardinalities);
                break;
            case "distinct-integer":
                results = profile(OperatorProfilers.createJavaDistinctProfiler(
                        DataGenerators.createReservoirBasedIntegerSupplier(new ArrayList<>(), 0.7d, new Random(42)),
                        Integer.class
                ), cardinalities);
                break;
            case "sort":
            case "sort-string":
                results = profile(OperatorProfilers.createJavaSortProfiler(), cardinalities);
                break;
            case "sort-integer":
                results = profile(OperatorProfilers.createJavaSortProfiler(
                        DataGenerators.createReservoirBasedIntegerSupplier(new ArrayList<>(), 0.7d, new Random(42)),
                        Integer.class
                ), cardinalities);
                break;
            case "count":
                results = profile(OperatorProfilers.createJavaCountProfiler(), cardinalities);
                break;
            case "groupby":
                results = profile(OperatorProfilers.createJavaMaterializedGroupByProfiler(), cardinalities);
                break;
            case "join":
                results = profile(OperatorProfilers.createJavaJoinProfiler(), cardinalities, cardinalities);
                break;
            case "union":
                results = profile(OperatorProfilers.createJavaUnionProfiler(), cardinalities, cardinalities);
                break;
            case "cartesian":
                results = profile(OperatorProfilers.createJavaCartesianProfiler(), cardinalities, cardinalities);
                break;
            case "callbacksink":
                results = profile(OperatorProfilers.createJavaLocalCallbackSinkProfiler(), cardinalities);
                break;
            case "collect":
                results = profile(OperatorProfilers.createCollectingJavaLocalCallbackSinkProfiler(), cardinalities);
                break;
            case "word-count-split": {
                final Supplier<String> randomStringSupplier = DataGenerators.createRandomStringSupplier(2, 10, new Random(42));
                results = profile(
                        OperatorProfilers.createJavaFlatMapProfiler(
                                () -> String.format("%s %s %s %s %s %s %s %s %s",
                                        randomStringSupplier.get(), randomStringSupplier.get(),
                                        randomStringSupplier.get(), randomStringSupplier.get(),
                                        randomStringSupplier.get(), randomStringSupplier.get(),
                                        randomStringSupplier.get(), randomStringSupplier.get(),
                                        randomStringSupplier.get()),
                                str -> Arrays.asList(str.split(" ")),
                                String.class,
                                String.class
                        ),
                        cardinalities);
                break;
            }
            case "word-count-canonicalize": {
                final Supplier<String> randomStringSupplier = DataGenerators.createRandomStringSupplier(2, 10, new Random(42));
                results = profile(
                        OperatorProfilers.createJavaMapProfiler(
                                randomStringSupplier,
                                word -> new Tuple2<>(word.toLowerCase(), 1),
                                String.class,
                                Tuple2.class
                        ),
                        cardinalities
                );
                break;
            }
            case "word-count-count": {
                final Supplier<String> stringSupplier = DataGenerators.createReservoirBasedStringSupplier(new ArrayList<>(), 0.7, new Random(42), 2, 10);
                results = profile(
                        OperatorProfilers.createJavaReduceByProfiler(
                                () -> new Tuple2<>(stringSupplier.get(), 1),
                                pair -> pair.field0,
                                (p1, p2) -> {
                                    p1.field1 += p2.field1;
                                    return p1;
                                },
                                cast(Tuple2.class),
                                String.class
                        ),
                        cardinalities
                );
                break;
            }
            default:
                System.out.println("Unknown operator: " + operator);
                return;
        }

        System.out.println();
        System.out.println(WayangCollections.getAny(results).getCsvHeader());
        results.forEach(result -> System.out.println(result.toCsvString()));
    }

    private static StopWatch createStopWatch() {
        Experiment experiment = new Experiment("wayang-profiler", new Subject("Wayang", "0.1"));
        return new StopWatch(experiment);
    }

    @SuppressWarnings("unchecked")
    private static <T> Class<T> cast(Class<?> cls) {
        return (Class<T>) cls;
    }


    private static List<OperatorProfiler.Result> profile(SourceProfiler sourceProfiler, Collection<Integer> cardinalities) {
        return cardinalities.stream()
                .map(cardinality -> profile(sourceProfiler, cardinality))
                .collect(Collectors.toList());
    }

    private static OperatorProfiler.Result profile(SourceProfiler sourceProfiler, int cardinality) {
        System.out.println("Running garbage collector...");
        for (int i = 0; i < GC_RUNS; i++) {
            System.gc();
        }
        ProfilingUtils.sleep(1000);

        System.out.printf("Profiling %s with %d data quanta.\n", sourceProfiler, cardinality);
        final StopWatch stopWatch = createStopWatch();

        System.out.println("Prepare...");
        final TimeMeasurement preparation = stopWatch.start("Preparation");
        sourceProfiler.prepare(cardinality);
        preparation.stop();

        System.out.println("Execute...");
        final TimeMeasurement execution = stopWatch.start("Execution");
        final OperatorProfiler.Result result = sourceProfiler.run();
        execution.stop();

        System.out.println("Measurement:");
        System.out.println(result);
        System.out.println(stopWatch.toPrettyString());
        System.out.println();

        return result;
    }

    private static List<OperatorProfiler.Result> profile(UnaryOperatorProfiler unaryOperatorProfiler, Collection<Integer> cardinalities) {
        return cardinalities.stream()
                .map(cardinality -> profile(unaryOperatorProfiler, cardinality))
                .collect(Collectors.toList());
    }

    private static OperatorProfiler.Result profile(UnaryOperatorProfiler unaryOperatorProfiler, int cardinality) {
        System.out.println("Running garbage collector...");
        for (int i = 0; i < GC_RUNS; i++) {
            System.gc();
        }
        ProfilingUtils.sleep(1000);

        System.out.printf("Profiling %s with %d data quanta.\n", unaryOperatorProfiler, cardinality);
        final StopWatch stopWatch = createStopWatch();

        System.out.println("Prepare...");
        final TimeMeasurement preparation = stopWatch.start("Preparation");
        unaryOperatorProfiler.prepare(cardinality);
        preparation.stop();

        System.out.println("Execute...");
        final TimeMeasurement execution = stopWatch.start("Execution");
        final OperatorProfiler.Result result = unaryOperatorProfiler.run();
        execution.stop();

        System.out.println("Measurement:");
        System.out.println(result);
        System.out.println(stopWatch.toPrettyString());
        System.out.println();

        return result;
    }

    private static List<OperatorProfiler.Result> profile(BinaryOperatorProfiler binaryOperatorProfiler,
                                                         Collection<Integer> cardinalities0,
                                                         Collection<Integer> cardinalities1) {
        return cardinalities0.stream()
                .flatMap(cardinality0 ->
                        cardinalities1.stream()
                                .map(
                                        cardinality1 -> profile(binaryOperatorProfiler, cardinality0, cardinality1)
                                )
                )
                .collect(Collectors.toList());
    }

    private static OperatorProfiler.Result profile(BinaryOperatorProfiler binaryOperatorProfiler,
                                                   int cardinality0,
                                                   int cardinality1) {
        System.out.println("Running garbage collector...");
        for (int i = 0; i < GC_RUNS; i++) {
            System.gc();
        }
        ProfilingUtils.sleep(1000);

        System.out.printf("Profiling %s with %dx%d data quanta.\n", binaryOperatorProfiler.getOperator(), cardinality0, cardinality1);
        final StopWatch stopWatch = createStopWatch();

        System.out.println("Prepare...");
        final TimeMeasurement preparation = stopWatch.start("Preparation");
        binaryOperatorProfiler.prepare(cardinality0, cardinality1);
        preparation.stop();

        System.out.println("Execute...");
        final TimeMeasurement execution = stopWatch.start("Execution");
        final OperatorProfiler.Result result = binaryOperatorProfiler.run();
        execution.stop();

        System.out.println("Measurement:");
        System.out.println(result);
        System.out.println(stopWatch.toPrettyString());
        System.out.println();

        return result;
    }

    private static List<OperatorProfiler.Result> profile(SinkProfiler sinkProfiler, Collection<Integer> cardinalities) {
        return cardinalities.stream()
                .map(cardinality -> profile(sinkProfiler, cardinality))
                .collect(Collectors.toList());
    }

    private static OperatorProfiler.Result profile(SinkProfiler sinkProfiler, int cardinality) {
        System.out.println("Running garbage collector...");
        for (int i = 0; i < GC_RUNS; i++) {
            System.gc();
        }
        ProfilingUtils.sleep(1000);

        System.out.printf("Profiling %s with %d data quanta.\n", sinkProfiler, cardinality);
        final StopWatch stopWatch = createStopWatch();

        System.out.println("Prepare...");
        final TimeMeasurement preparation = stopWatch.start("Preparation");
        sinkProfiler.prepare(cardinality);
        preparation.stop();

        System.out.println("Execute...");
        final TimeMeasurement execution = stopWatch.start("Execution");
        final OperatorProfiler.Result result = sinkProfiler.run();
        execution.stop();

        System.out.println("Measurement:");
        System.out.println(result);
        System.out.println(stopWatch.toPrettyString());
        System.out.println();

        return result;
    }

}
