/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.tests;

import java.net.URI;
import java.net.URISyntaxException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.wayang.api.DataQuantaBuilder;
import org.apache.wayang.api.JavaPlanBuilder;
import org.apache.wayang.basic.data.Record;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.basic.operators.CollectionSource;
import org.apache.wayang.basic.operators.CountOperator;
import org.apache.wayang.basic.operators.DistinctOperator;
import org.apache.wayang.basic.operators.DoWhileOperator;
import org.apache.wayang.basic.operators.FilterOperator;
import org.apache.wayang.basic.operators.FlatMapOperator;
import org.apache.wayang.basic.operators.GlobalMaterializedGroupOperator;
import org.apache.wayang.basic.operators.IntersectOperator;
import org.apache.wayang.basic.operators.LocalCallbackSink;
import org.apache.wayang.basic.operators.LoopOperator;
import org.apache.wayang.basic.operators.MapOperator;
import org.apache.wayang.basic.operators.MapPartitionsOperator;
import org.apache.wayang.basic.operators.PageRankOperator;
import org.apache.wayang.basic.operators.RepeatOperator;
import org.apache.wayang.basic.operators.SampleOperator;
import org.apache.wayang.basic.operators.SortOperator;
import org.apache.wayang.basic.operators.TextFileSource;
import org.apache.wayang.basic.operators.UnionAllOperator;
import org.apache.wayang.basic.operators.ZipWithIdOperator;
import org.apache.wayang.basic.types.RecordType;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.function.ExecutionContext;
import org.apache.wayang.core.function.FlatMapDescriptor;
import org.apache.wayang.core.function.FunctionDescriptor;
import org.apache.wayang.core.function.PredicateDescriptor;
import org.apache.wayang.core.function.TransformationDescriptor;
import org.apache.wayang.core.plan.wayangplan.WayangPlan;
import org.apache.wayang.core.types.DataSetType;
import org.apache.wayang.core.types.DataUnitType;
import org.apache.wayang.core.util.ReflectionUtils;
import org.apache.wayang.core.util.Tuple;
import org.apache.wayang.core.util.WayangArrays;
import org.apache.wayang.spark.operators.SparkShufflePartitionSampleOperator;
import org.apache.wayang.sqlite3.Sqlite3;
import org.apache.wayang.sqlite3.operators.Sqlite3TableSource;

/**
 * Provides plans that can be used for integration testing.
 */
public class WayangPlans {

    public static final URI FILE_SOME_LINES_TXT = createUri("/some-lines.input");

    public static final URI FILE_OTHER_LINES_TXT = createUri("/other-lines.input");

    public static final URI ULYSSES_TXT = createUri("/ulysses.input");

    public static final URI FILE_WITH_KEY_1 = createUri("/lines-with-key1.input");

    public static final URI FILE_WITH_KEY_2 = createUri("/lines-with-key2.input");

    public static URI createUri(String resourcePath) {
        try {
            return WayangPlans.class.getResource(resourcePath).toURI();
        } catch (URISyntaxException e) {
            throw new IllegalArgumentException("Illegal URI.", e);
        }

    }

    /**
     * Creates a {@link WayangPlan} consisting of a {@link TextFileSource} and a {@link LocalCallbackSink}.
     */
    public static WayangPlan readWrite(URI inputFileUri, List<String> collector) {
        TextFileSource textFileSource = new TextFileSource(inputFileUri.toString());
        LocalCallbackSink<String> sink = LocalCallbackSink.createCollectingSink(collector, String.class);
        textFileSource.connectTo(0, sink, 0);
        return new WayangPlan(sink);
    }

    /**
     * Creates a {@link WayangPlan} consisting of a {@link TextFileSource}, a {@link MapOperator} (performs
     * {@link String#toUpperCase()}), and a {@link LocalCallbackSink}.
     */
    public static WayangPlan readTransformWrite(URI inputFileUri) {
        TextFileSource textFileSource = new TextFileSource(inputFileUri.toString());
        MapOperator<String, String> reverseOperator = new MapOperator<>(
                String::toUpperCase, String.class, String.class
        );
        textFileSource.connectTo(0, reverseOperator, 0);
        LocalCallbackSink<String> stdoutSink = LocalCallbackSink.createStdoutSink(String.class);
        reverseOperator.connectTo(0, stdoutSink, 0);
        WayangPlan wayangPlan = new WayangPlan();
        wayangPlan.addSink(stdoutSink);
        return wayangPlan;
    }

    /**
     * Creates a {@link WayangPlan} with two {@link CollectionSource}s and two {@link LocalCallbackSink}s. Both sources
     * go into a {@link UnionAllOperator} and for the first {@link LocalCallbackSink}, the data quanta are routed
     * via a {@link MapOperator} that applies {@link String#toUpperCase()}.
     */
    public static WayangPlan multiSourceMultiSink(List<String> inputList1, List<String> inputList2,
                                                 List<String> collector1, List<String> collector2) {
        CollectionSource<String> source1 = new CollectionSource<>(inputList1, String.class);
        source1.setName("source1");
        CollectionSource<String> source2 = new CollectionSource<>(inputList2, String.class);
        source2.setName("source2");

        UnionAllOperator<String> coalesceOperator = new UnionAllOperator<>(String.class);
        coalesceOperator.setName("source1+2");
        source1.connectTo(0, coalesceOperator, 0);
        source2.connectTo(0, coalesceOperator, 1);

        MapOperator<String, String> uppercaseOperator = new MapOperator<>(
                String::toUpperCase, String.class, String.class
        );
        uppercaseOperator.setName("uppercase");
        coalesceOperator.connectTo(0, uppercaseOperator, 0);

        LocalCallbackSink<String> sink1 = LocalCallbackSink.createCollectingSink(collector1, String.class);
        sink1.setName("sink1");
        uppercaseOperator.connectTo(0, sink1, 0);

        LocalCallbackSink<String> sink2 = LocalCallbackSink.createCollectingSink(collector2, String.class);
        sink2.setName("sink2");
        coalesceOperator.connectTo(0, sink2, 0);

        return new WayangPlan(sink1, sink2);
    }

    /**
     * Creates a {@link WayangPlan} with two {@link CollectionSource}s and two {@link LocalCallbackSink}s. Both sources
     * go into a {@link UnionAllOperator}. Then, the data flow diverges again and to the branches one {@link MapOperator}
     * is applied with {@link String#toUpperCase()} and {@link String#toLowerCase()}. Finally, the both branches
     * are united via another {@link UnionAllOperator}, which is in turn consumed by the two {@link LocalCallbackSink}s.
     */
    public static WayangPlan multiSourceHoleMultiSink(List<String> inputList1, List<String> inputList2,
                                                     List<String> collector1, List<String> collector2) {

        CollectionSource<String> source1 = new CollectionSource<>(inputList1, String.class);
        source1.setName("source1");
        CollectionSource<String> source2 = new CollectionSource<>(inputList2, String.class);
        source2.setName("source2");

        UnionAllOperator<String> coalesceOperator1 = new UnionAllOperator<>(String.class);
        coalesceOperator1.setName("union1");
        source1.connectTo(0, coalesceOperator1, 0);
        source2.connectTo(0, coalesceOperator1, 1);

        MapOperator<String, String> lowerCaseOperator = new MapOperator<>(
                String::toLowerCase, String.class, String.class
        );
        lowerCaseOperator.setName("toLowerCase");
        coalesceOperator1.connectTo(0, lowerCaseOperator, 0);

        MapOperator<String, String> upperCaseOperator = new MapOperator<>(
                String::toUpperCase, String.class, String.class
        );
        upperCaseOperator.setName("toUpperCase");
        coalesceOperator1.connectTo(0, upperCaseOperator, 0);

        UnionAllOperator<String> coalesceOperator2 = new UnionAllOperator<>(String.class);
        coalesceOperator2.setName("union2");
        lowerCaseOperator.connectTo(0, coalesceOperator2, 0);
        upperCaseOperator.connectTo(0, coalesceOperator2, 1);

        LocalCallbackSink<String> sink1 = LocalCallbackSink.createCollectingSink(collector1, String.class);
        sink1.setName("sink1");
        coalesceOperator2.connectTo(0, sink1, 0);

        LocalCallbackSink<String> sink2 = LocalCallbackSink.createCollectingSink(collector2, String.class);
        sink2.setName("sink2");
        coalesceOperator2.connectTo(0, sink2, 0);

        return new WayangPlan(sink1, sink2);
    }

    /**
     * Creates a {@link WayangPlan} with a {@link TextFileSource}, a {@link SortOperator}, a {@link MapOperator},
     * a {@link DistinctOperator}, a {@link CountOperator}, and finally a {@link LocalCallbackSink} (stdout).
     */
    public static WayangPlan diverseScenario1(URI inputFileUri) {

        // Build a Wayang plan.
        TextFileSource textFileSource = new TextFileSource(inputFileUri.toString());
        textFileSource.setName("Load input file");
        SortOperator<String, String> sortOperator = new SortOperator<>(in->in, String.class, String.class);
        sortOperator.setName("Sort lines");
        MapOperator<String, String> upperCaseOperator = new MapOperator<>(
                String::toUpperCase, String.class, String.class
        );
        upperCaseOperator.setName("To uppercase");
        DistinctOperator<String> distinctLinesOperator = new DistinctOperator<>(String.class);
        distinctLinesOperator.setName("Make lines distinct");
        CountOperator<String> countLinesOperator = new CountOperator<>(String.class);
        countLinesOperator.setName("Count lines");
        LocalCallbackSink<Long> stdoutSink = LocalCallbackSink.createStdoutSink(Long.class);
        stdoutSink.setName("Print count");

        textFileSource.connectTo(0, sortOperator, 0);
        sortOperator.connectTo(0, upperCaseOperator, 0);
        upperCaseOperator.connectTo(0, distinctLinesOperator, 0);
        distinctLinesOperator.connectTo(0, countLinesOperator, 0);
        countLinesOperator.connectTo(0, stdoutSink, 0);

        return new WayangPlan(stdoutSink);
    }

    /**
     * Creates a {@link WayangPlan} with two {@link TextFileSource}s, of which the first goes through a {@link FilterOperator}
     * Then, they are unioned in a {@link UnionAllOperator}, go through a {@link SortOperator}, a {@link MapOperator}
     * (applies {@link String#toUpperCase()}), {@link DistinctOperator}, and finally a {@link LocalCallbackSink} (stdout).
     */
    public static WayangPlan diverseScenario2(URI inputFileUri1, URI inputFileUri2) throws URISyntaxException {
        // Build a Wayang plan.
        TextFileSource textFileSource1 = new TextFileSource(inputFileUri1.toString());
        TextFileSource textFileSource2 = new TextFileSource(inputFileUri2.toString());
        FilterOperator<String> noCommaOperator = new FilterOperator<>(s -> !s.contains(","), String.class);
        MapOperator<String, String> upperCaseOperator = new MapOperator<>(
                String::toUpperCase, String.class, String.class
        );
        UnionAllOperator<String> unionOperator = new UnionAllOperator<>(String.class);
        SortOperator<String, String> sortOperator = new SortOperator<>(r->r, String.class, String.class);
        DistinctOperator<String> distinctLinesOperator = new DistinctOperator<>(String.class);
        LocalCallbackSink<String> stdoutSink = LocalCallbackSink.createStdoutSink(String.class);

        // Read from file 1, remove commas, union with file 2, sort, upper case, then remove duplicates and output.
        textFileSource1.connectTo(0, noCommaOperator, 0);
        textFileSource2.connectTo(0, unionOperator, 0);
        noCommaOperator.connectTo(0, unionOperator, 1);
        unionOperator.connectTo(0, sortOperator, 0);
        sortOperator.connectTo(0, upperCaseOperator, 0);
        upperCaseOperator.connectTo(0, distinctLinesOperator, 0);
        distinctLinesOperator.connectTo(0, stdoutSink, 0);

        return new WayangPlan(stdoutSink);
    }

    /**
     * Creates a {@link WayangPlan} with a {@link CollectionSource} that is fed into a {@link LoopOperator}. It will
     * then {@code k} times map each value to {@code 2n} and {@code 2n+1}. Finally, the outcome of the loop is
     * collected in the {@code collector}.
     */
    public static WayangPlan simpleLoop(final int numIterations, Collection<Integer> collector, final int... values)
            throws URISyntaxException {
        CollectionSource<Integer> source = new CollectionSource<>(WayangArrays.asList(values), Integer.class);
        source.setName("source");

        CollectionSource<Integer> convergenceSource = new CollectionSource<>(WayangArrays.asList(0), Integer.class);
        convergenceSource.setName("convergenceSource");


        LoopOperator<Integer, Integer> loopOperator = new LoopOperator<>(DataSetType.createDefault(Integer.class),
                DataSetType.createDefault(Integer.class),
                (PredicateDescriptor.SerializablePredicate<Collection<Integer>>) collection ->
                        collection.iterator().next() >= numIterations,
                numIterations
        );
        loopOperator.setName("loop");
        loopOperator.initialize(source, convergenceSource);

        FlatMapOperator<Integer, Integer> stepOperator = new FlatMapOperator<>(
                val -> Arrays.asList(2 * val, 2 * val + 1),
                Integer.class,
                Integer.class
        );
        stepOperator.setName("step");

        MapOperator<Integer, Integer> counter = new MapOperator<>(
                new TransformationDescriptor<>(n -> n + 1, Integer.class, Integer.class)
        );
        counter.setName("counter");
        loopOperator.beginIteration(stepOperator, counter);
        loopOperator.endIteration(stepOperator, counter);

        LocalCallbackSink<Integer> sink = LocalCallbackSink.createCollectingSink(collector, Integer.class);
        sink.setName("sink");
        loopOperator.outputConnectTo(sink);

        // Create the WayangPlan.
        return new WayangPlan(sink);
    }

    /**
     * Creates a {@link WayangPlan} that goes through a loop thereby incorporating the iteration number.
     */
    public static Collection<Integer> loopWithIterationNumber(WayangContext wayangContext,
                                                              final int maxValue,
                                                              final int expectedNumIterations,
                                                              final int... values) {
        return new JavaPlanBuilder(wayangContext)
                .loadCollection(WayangArrays.asList(values)).withName("Load values")
                .doWhile(
                        vals -> {
                            for (Integer val : vals) {
                                if (val >= maxValue) return true;
                            }
                            return false;
                        },
                        loopHead -> {
                            DataQuantaBuilder<?, Integer> newVals = loopHead
                                    .map(new IncreaseByIterationNumber())
                                    .withName("Increase by iteration number");
                            return new Tuple<>(
                                    newVals.map(x -> x).withName("Identity 1").withOutputClass(Integer.class),
                                    newVals.map(x -> x).withName("Identity 2").withOutputClass(Integer.class)
                            );
                        }
                ).withExpectedNumberOfIterations(expectedNumIterations).withConditionClass(Integer.class)
                .collect();
    }

    /**
     * Increases all incoming {@link Integer}s by the current iteration number.
     */
    public static class IncreaseByIterationNumber
            implements FunctionDescriptor.ExtendedSerializableFunction<Integer, Integer> {

        private int increment;

        @Override
        public void open(ExecutionContext ctx) {
            this.increment = ctx.getCurrentIteration();
        }

        @Override
        public Integer apply(Integer integer) {
            return integer + this.increment;
        }
    }

    /**
     * Creates a {@link WayangPlan} with a {@link CollectionSource} that is fed into a {@link SampleOperator}. It will
     * then map each value to its double and output the results in the {@code collector}.
     */
    public static WayangPlan simpleSample(int sampleSize, Collection<Integer> collector, final int... values)
            throws URISyntaxException {
        CollectionSource<Integer> source = new CollectionSource<>(WayangArrays.asList(values), Integer.class);
        source.setName("source");

        SampleOperator<Integer> sampleOperator = new SampleOperator<>(
                sampleSize, DataSetType.createDefault(Integer.class), SampleOperator.Methods.RANDOM, SampleOperator.randomSeed()
        );
        sampleOperator.setName("sample");

        MapOperator<Integer, Integer> mapOperator = new MapOperator<>(n -> 2 * n, Integer.class, Integer.class);
        mapOperator.setName("map");

        LocalCallbackSink<Integer> sink = LocalCallbackSink.createCollectingSink(collector, Integer.class);
        sink.setName("sink");

        source.connectTo(0, sampleOperator, 0);
        sampleOperator.connectTo(0, mapOperator, 0);
        mapOperator.connectTo(0, sink, 0);

        // Create the WayangPlan.
        return new WayangPlan(sink);
    }

    public static WayangPlan sampleInLoop(int sampleSize, int iterations, Collection<Integer> collector, final int... inputValues) {

        // Prepare test data.
        CollectionSource<Integer> source = new CollectionSource<>(WayangArrays.asList(inputValues), Integer.class);
        source.setName("source");

        CollectionSource<Integer> convergenceSource = new CollectionSource<>(WayangArrays.asList(0), Integer.class);
        convergenceSource.setName("convergenceSource");


        LoopOperator<Integer, Integer> loopOperator = new LoopOperator<>(DataSetType.createDefault(Integer.class),
                DataSetType.createDefault(Integer.class),
                (PredicateDescriptor.SerializablePredicate<Collection<Integer>>) collection ->
                        collection.iterator().next() >= iterations, iterations
        );
        loopOperator.setName("loop");
        loopOperator.initialize(source, convergenceSource);

        // Build the sample operator.
        SparkShufflePartitionSampleOperator<Integer> sampleOperator =
                new SparkShufflePartitionSampleOperator<>(
                        iterationNumber -> sampleSize,
                        DataSetType.createDefaultUnchecked(Integer.class),
                        iterationNumber -> 42 + iterationNumber
                );
        sampleOperator.setDatasetSize(10);
        sampleOperator.setName("sample");

        MapOperator<Integer, Integer> counter = new MapOperator<>(
                new TransformationDescriptor<>(n -> n + 1, Integer.class, Integer.class)
        );
        counter.setName("counter");
        loopOperator.beginIteration(sampleOperator, counter);
        loopOperator.endIteration(sampleOperator, counter);

        LocalCallbackSink<Integer> sink = LocalCallbackSink.createCollectingSink(collector, Integer.class);
        sink.setName("sink");
        loopOperator.outputConnectTo(sink);

        return new WayangPlan(sink);
    }

    /**
     * Creates a {@link WayangPlan} with a {@link CollectionSource} that is fed into a {@link GlobalMaterializedGroupOperator}.
     * It will then push the results in the {@code collector}.
     */
    public static WayangPlan globalMaterializedGroup(Collection<Iterable<Integer>> collector, final int... values)
            throws URISyntaxException {
        CollectionSource<Integer> source = new CollectionSource<>(WayangArrays.asList(values), Integer.class);
        source.setName("source");

        GlobalMaterializedGroupOperator<Integer> globalMaterializedGroupOperator =
                new GlobalMaterializedGroupOperator<>(Integer.class);
        globalMaterializedGroupOperator.setName("group");

        LocalCallbackSink<Iterable<Integer>> sink = LocalCallbackSink.createCollectingSink(
                collector,
                DataSetType.createGrouped(Integer.class)
        );
        sink.setName("sink");

        source.connectTo(0, globalMaterializedGroupOperator, 0);
        globalMaterializedGroupOperator.connectTo(0, sink, 0);

        // Create the WayangPlan.
        return new WayangPlan(sink);
    }


    /**
     * Creates a {@link WayangPlan} with a {@link CollectionSource} that is fed into a {@link RepeatOperator}.
     * The input values will be incremented by 1 n times.
     * It will then push the results in the {@code collector}.
     */
    public static WayangPlan repeat(Collection<Integer> collector, int numIterations, final int... values) {
        CollectionSource<Integer> source = new CollectionSource<>(WayangArrays.asList(values), Integer.class);
        source.setName("source");

        RepeatOperator<Integer> repeat = new RepeatOperator<>(numIterations, Integer.class);
        repeat.setName("repeat");

        MapOperator<Integer, Integer> increment = new MapOperator<>(
                i -> i + 1, Integer.class, Integer.class
        );
        increment.setName("increment");

        LocalCallbackSink<Integer> sink = LocalCallbackSink.createCollectingSink(
                collector,
                DataSetType.createDefault(Integer.class)
        );
        sink.setName("sink");

        repeat.initialize(source, 0);
        repeat.beginIteration(increment, 0);
        repeat.endIteration(increment, 0);
        repeat.connectFinalOutputTo(sink, 0);

        return new WayangPlan(sink);
    }


    /**
     * Creates a {@link WayangPlan} with a {@link CollectionSource} that is fed into a {@link ZipWithIdOperator}.
     * It will then push the results in the {@code collector}.
     */
    public static WayangPlan zipWithId(Collection<Long> collector, final int... values)
            throws URISyntaxException {
        CollectionSource<Integer> source = new CollectionSource<>(WayangArrays.asList(values), Integer.class);
        source.setName("source");

        ZipWithIdOperator<Integer> zipWithId = new ZipWithIdOperator<>(Integer.class);
        zipWithId.setName("zipWithId");

        MapOperator<Tuple2<Long, Integer>, Long> stripValue = new MapOperator<>(
                tuple -> tuple.field0, ReflectionUtils.specify(Tuple2.class), Long.class
        );
        stripValue.setName("stripValue");

        DistinctOperator<Long> distinctIds = new DistinctOperator<>(Long.class);
        distinctIds.setName("distinctIds");

        CountOperator<Long> count = new CountOperator<>(Long.class);
        count.setName("count");

        LocalCallbackSink<Long> sink = LocalCallbackSink.createCollectingSink(
                collector,
                DataSetType.createDefault(Long.class)
        );
        sink.setName("sink");

        source.connectTo(0, zipWithId, 0);
        zipWithId.connectTo(0, stripValue, 0);
        stripValue.connectTo(0, distinctIds, 0);
        distinctIds.connectTo(0, count, 0);
        count.connectTo(0, sink, 0);

        // Create the WayangPlan.
        return new WayangPlan(sink);
    }


    /**
     * Creates a {@link WayangPlan} with a {@link CollectionSource}. The data quanta are separated into negative and
     * non-negative. Then, their squares are intersected using the {@link IntersectOperator}. The result is
     * pushed to the {@code collector}.
     */
    public static WayangPlan intersectSquares(Collection<Integer> collector, final int... values)
            throws URISyntaxException {

        CollectionSource<Integer> source = new CollectionSource<>(WayangArrays.asList(values), Integer.class);
        source.setName("source");

        FilterOperator<Integer> filterNegative = new FilterOperator<>(i -> i < 0, Integer.class);
        filterNegative.setName("filterNegative");
        source.connectTo(0, filterNegative, 0);

        MapOperator<Integer, Integer> squareNegative = new MapOperator<>(i -> i * i, Integer.class, Integer.class);
        squareNegative.setName("squareNegative");
        filterNegative.connectTo(0, squareNegative, 0);

        FilterOperator<Integer> filterPositive = new FilterOperator<>(i -> i >= 0, Integer.class);
        filterPositive.setName("filterPositive");
        source.connectTo(0, filterPositive, 0);

        MapOperator<Integer, Integer> squarePositive = new MapOperator<>(i -> i * i, Integer.class, Integer.class);
        squarePositive.setName("squarePositive");
        filterPositive.connectTo(0, squarePositive, 0);

        IntersectOperator<Integer> intersect = new IntersectOperator<>(Integer.class);
        intersect.setName("intersect");
        squarePositive.connectTo(0, intersect, 1);
        squareNegative.connectTo(0, intersect, 0);

        LocalCallbackSink<Integer> sink = LocalCallbackSink.createCollectingSink(
                collector,
                Integer.class
        );
        sink.setName("sink");
        intersect.connectTo(0, sink, 0);

        // Create the WayangPlan.
        return new WayangPlan(sink);
    }

    /**
     * Creates a cross-community PageRank Wayang plan, that incorporates the {@link PageRankOperator}.
     */
    public static WayangPlan pageRankWithDictionaryCompression(Collection<Tuple2<Character, Float>> pageRankCollector) {
        // Get some graph data. Use the example from Wikipedia: https://en.wikipedia.org/wiki/PageRank
        Collection<char[]> adjacencies = Arrays.asList(
                new char[]{'B', 'C'},
                new char[]{'C', 'B'},
                new char[]{'D', 'A', 'B'},
                new char[]{'E', 'B', 'D', 'F'},
                new char[]{'F', 'B', 'E'},
                new char[]{'G', 'B', 'E'},
                new char[]{'H', 'B', 'E'},
                new char[]{'I', 'B', 'E'},
                new char[]{'J', 'E'},
                new char[]{'K', 'E'}
        );

        // Create a WayangPlan:

        // Load the adjacency list.
        final CollectionSource<char[]> adjacencySource = new CollectionSource<>(adjacencies, char[].class);
        adjacencySource.setName("adjacency source");

        // Split the adjacency list into an edge list.
        FlatMapOperator<char[], Tuple2<Character, Character>> adjacencySplitter = new FlatMapOperator<>(
                new FlatMapDescriptor<>(
                        (adjacence) -> {
                            List<Tuple2<Character, Character>> result = new ArrayList<>(adjacence.length - 1);
                            for (int i = 1; i < adjacence.length; i++) {
                                result.add(new Tuple2<>(adjacence[0], adjacence[i]));
                            }
                            return result;
                        },
                        char[].class,
                        ReflectionUtils.specify(Tuple2.class))
        );
        adjacencySplitter.setName("adjacency splitter");
        adjacencySource.connectTo(0, adjacencySplitter, 0);

        // Extract the vertices from the edge list.
        FlatMapOperator<Tuple2<Character, Character>, Character> vertexSplitter = new FlatMapOperator<>(
                new FlatMapDescriptor<>(
                        (edge) -> {
                            List<Character> vertices = new ArrayList<>(2);
                            vertices.add(edge.field0);
                            vertices.add(edge.field1);
                            return vertices;
                        },
                        ReflectionUtils.specify(Tuple2.class),
                        Character.class
                )
        );
        vertexSplitter.setName("vertex splitter");
        adjacencySplitter.connectTo(0, vertexSplitter, 0);

        // Find the distinct vertices.
        DistinctOperator<Character> vertexCanonicalizer = new DistinctOperator<>(Character.class);
        vertexCanonicalizer.setName("vertex canonicalizer");
        vertexSplitter.connectTo(0, vertexCanonicalizer, 0);

        // Assign an ID to each distinct vertex.
        ZipWithIdOperator<Character> zipWithId = new ZipWithIdOperator<>(Character.class);
        zipWithId.setName("zip with ID");
        vertexCanonicalizer.connectTo(0, zipWithId, 0);

        // Base the edge list on vertex IDs.
        MapOperator<Tuple2<Character, Character>, Tuple2<Long, Long>> translate = new MapOperator<>(
                new TransformationDescriptor<>(
                        new FunctionDescriptor.ExtendedSerializableFunction<Tuple2<Character, Character>, Tuple2<Long, Long>>() {

                            private Map<Character, Long> dictionary;

                            @Override
                            public void open(ExecutionContext ctx) {
                                this.dictionary = ctx.<Tuple2<Long, Character>>getBroadcast("vertex IDs").stream()
                                        .collect(Collectors.toMap(Tuple2::getField1, Tuple2::getField0));
                            }

                            @Override
                            public Tuple2<Long, Long> apply(Tuple2<Character, Character> in) {
                                return new Tuple2<>(this.dictionary.get(in.field0), this.dictionary.get(in.field1));
                            }
                        },
                        DataUnitType.createBasicUnchecked(Tuple2.class),
                        DataUnitType.createBasicUnchecked(Tuple2.class)
                )
        );
        translate.setName("translate");
        adjacencySplitter.connectTo(0, translate, 0);
        zipWithId.broadcastTo(0, translate, "vertex IDs");

        // Run the PageRank algorithm.
        PageRankOperator pageRank = new PageRankOperator(20);
        pageRank.setName("PageRank");
        translate.connectTo(0, pageRank, 0);

        // Back-translate the page ranks.
        MapOperator<Tuple2<Long, Float>, Tuple2<Character, Float>> backtranslate = new MapOperator<>(
                new TransformationDescriptor<>(
                        new FunctionDescriptor.ExtendedSerializableFunction<Tuple2<Long, Float>, Tuple2<Character, Float>>() {

                            private Map<Long, Character> dictionary;

                            @Override
                            public void open(ExecutionContext ctx) {
                                this.dictionary = ctx.<Tuple2<Long, Character>>getBroadcast("vertex IDs").stream()
                                        .collect(Collectors.toMap(Tuple2::getField0, Tuple2::getField1));
                            }

                            @Override
                            public Tuple2<Character, Float> apply(Tuple2<Long, Float> in) {
                                return new Tuple2<>(this.dictionary.get(in.field0), in.field1);
                            }
                        },
                        DataUnitType.createBasicUnchecked(Tuple2.class),
                        DataUnitType.createBasicUnchecked(Tuple2.class)
                )
        );
        backtranslate.setName("bracktranslate");
        pageRank.connectTo(0, backtranslate, 0);
        zipWithId.broadcastTo(0, backtranslate, "vertex IDs");

        LocalCallbackSink callbackSink = LocalCallbackSink.createCollectingSink(
                pageRankCollector,
                DataSetType.<Tuple2<Character, Float>>createDefaultUnchecked(Tuple2.class)
        );
        callbackSink.setName("sink");
        backtranslate.connectTo(0, callbackSink, 0);

        return new WayangPlan(callbackSink);
    }

    public static Map<Character, Float> pageRankWithDictionaryCompressionSolution() {
        return Stream.of(
                new Tuple2<>('A', 0.033f),
                new Tuple2<>('B', 0.384f),
                new Tuple2<>('C', 0.343f),
                new Tuple2<>('D', 0.039f),
                new Tuple2<>('E', 0.081f),
                new Tuple2<>('F', 0.039f),
                new Tuple2<>('G', 0.016f),
                new Tuple2<>('H', 0.016f),
                new Tuple2<>('I', 0.016f),
                new Tuple2<>('J', 0.016f),
                new Tuple2<>('K', 0.016f)
        ).collect(Collectors.toMap(Tuple2::getField0, Tuple2::getField1));
    }

    /**
     * Feeds the {@code edges} into a {@link PageRankOperator} and collects the page ranks in the {@code collector}.
     *
     * @return a {@link WayangPlan} implementing the above described
     */
    public static WayangPlan pageRank(Collection<Tuple2<Long, Long>> edges,
                                     Collection<Tuple2<Long, Float>> collector) {
        CollectionSource<Tuple2<Long, Long>> source = new CollectionSource<>(
                edges, ReflectionUtils.specify(Tuple2.class)
        );
        source.setName("source");

        PageRankOperator pageRank = new PageRankOperator(20);
        pageRank.setName("pageRank");
        source.connectTo(0, pageRank, 0);

        final LocalCallbackSink<Tuple2<Long, Float>> sink =
                LocalCallbackSink.createCollectingSink(collector, ReflectionUtils.specify(Tuple2.class));
        pageRank.connectTo(0, sink, 0);

        return new WayangPlan(sink);
    }

    /**
     * Creates and executed a {@link WayangPlan} that counts the number of even and odd numbers using a
     * {@link MapPartitionsOperator} to pre-aggregate partitions.
     *
     * @param wayangContext provide the execution environment
     * @param inputValues  that should be dissected and counted
     */
    public static Collection<Tuple2<String, Integer>> mapPartitions(WayangContext wayangContext, int... inputValues) {
        JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);

        // Execute the job.
        return builder
                .loadCollection(WayangArrays.asList(inputValues))
                .mapPartitions(partition -> {
                    int numEvens = 0, numOdds = 0;
                    for (Integer value : partition) {
                        if ((value & 1) == 0) numEvens++;
                        else numOdds++;
                    }
                    return Arrays.asList(
                            new Tuple2<>("odd", numOdds),
                            new Tuple2<>("even", numEvens)
                    );
                })
                .reduceByKey(Tuple2::getField0, (t1, t2) -> new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1()))
                .collect();

    }

    /**
     * Same as scenarion2 but repeat 10 times before output.
     */
    public static WayangPlan diverseScenario3(URI inputFileUri1, URI inputFileUri2) throws URISyntaxException {
        // Build a Wayang plan.
        TextFileSource textFileSource1 = new TextFileSource(inputFileUri1.toString());
        textFileSource1.setName("Source 1");
        TextFileSource textFileSource2 = new TextFileSource(inputFileUri2.toString());
        textFileSource2.setName("Source 2");
        FilterOperator<String> noCommaOperator = new FilterOperator<>(s -> !s.contains(","), String.class);
        noCommaOperator.setName("Filter comma");
        UnionAllOperator<String> unionOperator = new UnionAllOperator<>(String.class);
        unionOperator.setName("Union");
        LocalCallbackSink<String> stdoutSink = LocalCallbackSink.createStdoutSink(String.class);
        stdoutSink.setName("Print");
        SortOperator<String, String> sortOperator = new SortOperator<>(r->r, String.class, String.class);
        sortOperator.setName("Sort");
        CountOperator<String> countLines = new CountOperator<>(String.class);
        countLines.setName("Count");
        DoWhileOperator<String, Long> loopOperator = new DoWhileOperator<>(
                DataSetType.createDefault(String.class),
                DataSetType.createDefault(Long.class),
                integers -> integers.iterator().next() > 100,
                100
        );
        loopOperator.setName("Do while");
        MapOperator<String, String> upperCaseOperator = new MapOperator<>(
                new TransformationDescriptor<>(String::toUpperCase, String.class, String.class)
        );
        upperCaseOperator.setName("To uppercase");
        FilterOperator<String> dummyFilter = new FilterOperator<>(str -> true, String.class);
        dummyFilter.setName("Dummy filter");

        // Read from file 1, remove commas, union with file 2, sort, upper case, then remove duplicates and output.
        loopOperator.initialize(textFileSource1, 0);
        loopOperator.beginIteration(noCommaOperator, 0);
        textFileSource2.connectTo(0, unionOperator, 0);
        noCommaOperator.connectTo(0, unionOperator, 1);
        unionOperator.connectTo(0, sortOperator, 0);
        sortOperator.connectTo(0, countLines, 0);
        sortOperator.connectTo(0, dummyFilter, 0);
        loopOperator.endIteration(dummyFilter, 0, countLines, 0);
        loopOperator.outputConnectTo(upperCaseOperator, 0);
        upperCaseOperator.connectTo(0, stdoutSink, 0);

        // Create the WayangPlan.
        return new WayangPlan(stdoutSink);
    }

    public static Integer increment(Integer k) {
        if (k == null) {
            return 1;
        } else {
            return k++;
        }
    }

    public static String concat9(String k) {
        return k.concat("9");
    }

    /**
     * Simple counter loop .
     */
    public static WayangPlan diverseScenario4(URI inputFileUri1, URI inputFileUri2) throws URISyntaxException {
        // Build a Wayang plan.
        TextFileSource textFileSource1 = new TextFileSource(inputFileUri1.toString());
        textFileSource1.setName("file1");
        TextFileSource textFileSource2 = new TextFileSource(inputFileUri2.toString());
        textFileSource2.setName("file2");
        MapOperator<Integer, Integer> counter = new MapOperator<>(
                new TransformationDescriptor<>(n -> n + 1, Integer.class, Integer.class)
        );
        counter.setName("counter");
        UnionAllOperator<String> unionOperator = new UnionAllOperator<>(String.class);
        unionOperator.setName("union");
        LocalCallbackSink<String> stdoutSink = LocalCallbackSink.createStdoutSink(String.class);
        stdoutSink.setName("stdout");

        LoopOperator<String, Integer> loopOperator = new LoopOperator<>(DataSetType.createDefault(String.class),
                DataSetType.createDefault(Integer.class),
                (PredicateDescriptor.SerializablePredicate<Collection<Integer>>) collection ->
                        collection.iterator().next() >= 10,
                10
        );
        loopOperator.setName("loop");

        // Union 10 times then output
        loopOperator.initialize(textFileSource1, CollectionSource.singleton(0, Integer.class));
        loopOperator.beginIteration(unionOperator, counter);
        textFileSource2.connectTo(0, unionOperator, 1);
        loopOperator.endIteration(unionOperator, counter);
        loopOperator.outputConnectTo(stdoutSink, 0);

        // Create the WayangPlan.
        return new WayangPlan(stdoutSink);
    }

    /**
     * Prepair a SQLite3 database for the {@code sqlite3Scenario*} methods.
     *
     * @param configuration designates the location of the database
     * @throws SQLException
     */
    public static void prepareSqlite3Scenarios(Configuration configuration) throws SQLException {
        try (Connection connection = Sqlite3.platform()
                .createDatabaseDescriptor(configuration)
                .createJdbcConnection()) {
            final Statement statement = connection.createStatement();
            statement.addBatch("DROP TABLE IF EXISTS customer;");
            statement.addBatch("CREATE TABLE customer (name TEXT, age INT);");
            statement.addBatch("INSERT INTO customer VALUES ('John', 20)");
            statement.addBatch("INSERT INTO customer VALUES ('Timmy', 16)");
            statement.addBatch("INSERT INTO customer VALUES ('Evelyn', 35)");
            statement.executeBatch();
        }
    }

    public static List<Record> getSqlite3Customers() {
        return Arrays.asList(
                new Record("John", 20),
                new Record("Timmy", 16),
                new Record("Evelyn", 35)
        );
    }

    public static WayangPlan sqlite3Scenario1(Collection<Record> collector) {
        Sqlite3TableSource customers = new Sqlite3TableSource("customer");
        LocalCallbackSink<Record> sink = LocalCallbackSink.createCollectingSink(collector, Record.class);
        customers.connectTo(0, sink, 0);
        return new WayangPlan(sink);

    }

    public static WayangPlan sqlite3Scenario2(Collection<Record> collector) {
        Sqlite3TableSource customers = new Sqlite3TableSource("customer", "name", "age");
        FilterOperator<Record> filter = new FilterOperator<>(
                new PredicateDescriptor<>(
                        (PredicateDescriptor.SerializablePredicate<Record>) record -> (Integer) record.getField(1) >= 18,
                        Record.class
                ).withSqlImplementation("age >= 18"),
                DataSetType.createDefault(Record.class)
        );
        LocalCallbackSink<Record> sink = LocalCallbackSink.createCollectingSink(collector, Record.class);

        customers.connectTo(0, filter, 0);
        filter.connectTo(0, sink, 0);

        return new WayangPlan(sink);

    }

    public static WayangPlan sqlite3Scenario3(Collection<Record> collector) {
        Sqlite3TableSource customers = new Sqlite3TableSource("customer", "name", "age");
        FilterOperator<Record> filter = new FilterOperator<>(
                new PredicateDescriptor<>(
                        (PredicateDescriptor.SerializablePredicate<Record>) record -> (Integer) record.getField(1) >= 18,
                        Record.class
                ).withSqlImplementation("age >= 18"),
                customers.getType()
        );
        MapOperator<Record, Record> projection = MapOperator.createProjection(
                (RecordType) filter.getOutputType().getDataUnitType(),
                "name"
        );
        LocalCallbackSink<Record> sink = LocalCallbackSink.createCollectingSink(collector, Record.class);

        customers.connectTo(0, filter, 0);
        filter.connectTo(0, projection, 0);
        projection.connectTo(0, sink, 0);

        return new WayangPlan(sink);

    }


}


