/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.wayang.tests;

import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.wayang.basic.data.Tuple2;
import org.apache.wayang.basic.operators.CartesianOperator;
import org.apache.wayang.basic.operators.CoGroupOperator;
import org.apache.wayang.basic.operators.CollectionSource;
import org.apache.wayang.basic.operators.CountOperator;
import org.apache.wayang.basic.operators.DistinctOperator;
import org.apache.wayang.basic.operators.FilterOperator;
import org.apache.wayang.basic.operators.FlatMapOperator;
import org.apache.wayang.basic.operators.JoinOperator;
import org.apache.wayang.basic.operators.LocalCallbackSink;
import org.apache.wayang.basic.operators.MapOperator;
import org.apache.wayang.basic.operators.MapPartitionsOperator;
import org.apache.wayang.basic.operators.ReduceByOperator;
import org.apache.wayang.basic.operators.SortOperator;
import org.apache.wayang.basic.operators.TextFileSink;
import org.apache.wayang.basic.operators.TextFileSource;
import org.apache.wayang.basic.operators.UnionAllOperator;
import org.apache.wayang.basic.operators.ZipWithIdOperator;
import org.apache.wayang.core.function.FunctionDescriptor;
import org.apache.wayang.core.plan.wayangplan.WayangPlan;
import org.apache.wayang.core.util.ReflectionUtils;
import org.apache.wayang.core.util.WayangArrays;

/**
 * Provides plans that can be used for integration testing..
 */
public class WayangPlansOperators extends WayangPlans{

    public static WayangPlan cartesian(URI inputFileUri1, URI inputFileUri2, List<Tuple2<String, String>> collector){
        TextFileSource fileSource1 = new TextFileSource(inputFileUri1.toString());
        TextFileSource fileSource2 = new TextFileSource(inputFileUri2.toString());

        CartesianOperator<String, String> cartesianOperator = new CartesianOperator<String, String>(String.class, String.class);

        LocalCallbackSink<Tuple2<String, String>> sink = LocalCallbackSink.createCollectingSink(collector, ReflectionUtils.specify(Tuple2.class));

        fileSource1.connectTo(0, cartesianOperator, 0);
        fileSource2.connectTo(0, cartesianOperator, 1);
        cartesianOperator.connectTo(0, sink, 0);

        return new WayangPlan(sink);
    }

    public static WayangPlan coGroup(URI inputFileUri1, URI inputFileUri2, List<Tuple2<?, ?>> collector){
        TextFileSource fileSource1 = new TextFileSource(inputFileUri1.toString());
        TextFileSource fileSource2 = new TextFileSource(inputFileUri2.toString());

        FunctionDescriptor.SerializableFunction<String, Tuple2<String, Integer>> mapFunction =
                line -> {
                    String[] split = line.split(" ");
                    return new Tuple2<>(split[0], Integer.parseInt(split[1]));
                };

        MapOperator<String, Tuple2<String, Integer>> mapOperator1 = new MapOperator<String, Tuple2<String, Integer>>(
                mapFunction,
                String.class,
                ReflectionUtils.specify(Tuple2.class)
        );

        MapOperator<String, Tuple2<String, Integer>> mapOperator2 = new MapOperator<String, Tuple2<String, Integer>>(
                mapFunction,
                String.class,
                ReflectionUtils.specify(Tuple2.class)
        );


        FunctionDescriptor.SerializableFunction<Tuple2<String, Integer>, String> keyExtractor =
                element -> {
                    return element.field0;
                };

        CoGroupOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, String> coGroupOperator =
                new CoGroupOperator<>(
                        keyExtractor,
                        keyExtractor,
                        ReflectionUtils.specify(Tuple2.class),
                        ReflectionUtils.specify(Tuple2.class),
                        String.class
                );

        LocalCallbackSink<Tuple2<?, ?>> sink =
                LocalCallbackSink.createCollectingSink(collector, ReflectionUtils.specify(Tuple2.class));

        fileSource1.connectTo(0, mapOperator1, 0);
        fileSource2.connectTo(0, mapOperator2, 0);
        mapOperator1.connectTo(0, coGroupOperator, 0);
        mapOperator2.connectTo(0, coGroupOperator, 1);
        coGroupOperator.connectTo(0, sink, 0);

        return new WayangPlan(sink);
    }

    public static WayangPlan collectionSourceOperator(Collection<String> source, Collection<String> collector){
        CollectionSource<String> colSource = new CollectionSource<String>(source, String.class);

        LocalCallbackSink<String> sink = LocalCallbackSink.createCollectingSink(collector, String.class);

        colSource.connectTo(0, sink, 0);

        return new WayangPlan(sink);
    }

    public static WayangPlan count(Collection<String> source, Collection<Long> collector){
        CollectionSource<String> colSource = new CollectionSource<String>(source, String.class);

        CountOperator<String> countOperator = new CountOperator<String>(String.class);

        LocalCallbackSink<Long> sink = LocalCallbackSink.createCollectingSink(collector, Long.class);

        colSource.connectTo(0, countOperator, 0);
        countOperator.connectTo(0, sink, 0);

        return new WayangPlan(sink);
    }

    public static WayangPlan distinct(URI inputFileUri1, Collection<String> collector){
        TextFileSource source = new TextFileSource(inputFileUri1.toString());

        MapOperator<String, String> mapOperator = new MapOperator<String, String>(
                line -> {
                    return line.toLowerCase();
                },
                String.class,
                String.class
        );

        DistinctOperator<String> distinctOperator = new DistinctOperator<String>(String.class);

        LocalCallbackSink<String> sink = LocalCallbackSink.createCollectingSink(collector, String.class);

        source.connectTo(0, mapOperator, 0);
        mapOperator.connectTo(0, distinctOperator, 0);
        distinctOperator.connectTo(0, sink, 0);

        return new WayangPlan(sink);
    }

    public static WayangPlan filter(URI inputFileUri1, Collection<String> collector){
        TextFileSource source = new TextFileSource(inputFileUri1.toString());

        FilterOperator<String> filterOperator = new FilterOperator<String>(
                line -> {
                    return line.contains("line");
                },
                String.class
        );

        LocalCallbackSink<String> sink = LocalCallbackSink.createCollectingSink(collector, String.class);

        source.connectTo(0, filterOperator, 0);
        filterOperator.connectTo(0, sink, 0);

        return new WayangPlan(sink);
    }

    public static WayangPlan flatMap(URI inputFileUri1, Collection<String> collector){
        TextFileSource source = new TextFileSource(inputFileUri1.toString());

        FlatMapOperator<String, String> flatMapOperator = new FlatMapOperator<String, String>(
                line -> {
                    return Arrays.asList(line.split(" "));
                },
                String.class,
                String.class
        );

        LocalCallbackSink<String> sink = LocalCallbackSink.createCollectingSink(collector, String.class);

        source.connectTo(0, flatMapOperator, 0);
        flatMapOperator.connectTo(0, sink, 0);

        return new WayangPlan(sink);
    }

    public static WayangPlan join(URI inputFileUri1, URI inputFileUri2, Collection<Tuple2<?, ?>> collector){
        TextFileSource source1 = new TextFileSource(inputFileUri1.toString());
        TextFileSource source2 = new TextFileSource(inputFileUri2.toString());


        FunctionDescriptor.SerializableFunction<String, Tuple2<String, String>> mapFunction =
                line -> {
                    String[] split = line.split(" ");
                    return new Tuple2<>(split[0], split[1]);
                };

        MapOperator<String, Tuple2<String, String>> mapOperator1 = new MapOperator<String, Tuple2<String, String>>(
                mapFunction,
                String.class,
                ReflectionUtils.specify(Tuple2.class)
        );

        MapOperator<String, Tuple2<String, String>> mapOperator2 = new MapOperator<String, Tuple2<String, String>>(
                mapFunction,
                String.class,
                ReflectionUtils.specify(Tuple2.class)
        );

        FunctionDescriptor.SerializableFunction<Tuple2<String, String>, String> keyFunction = tuple -> tuple.field0;

        JoinOperator<Tuple2<String, String>, Tuple2<String, String>, String> joinOperator = new JoinOperator<Tuple2<String, String>, Tuple2<String, String>, String>(
                keyFunction,
                keyFunction,
                ReflectionUtils.specify(Tuple2.class),
                ReflectionUtils.specify(Tuple2.class),
                String.class
        );

        LocalCallbackSink<Tuple2<?, ?>> sink = LocalCallbackSink.createCollectingSink(collector, ReflectionUtils.specify(Tuple2.class));

        source1.connectTo(0, mapOperator1, 0);
        source2.connectTo(0, mapOperator2, 0);
        mapOperator1.connectTo(0, joinOperator, 0);
        mapOperator2.connectTo(0, joinOperator, 1);
        joinOperator.connectTo(0, sink, 0);

        return new WayangPlan(sink);
    }


    public static WayangPlan reduceBy(URI inputFileUri1, Collection<Tuple2<?, ?>> collector){
        TextFileSource source = new TextFileSource(inputFileUri1.toString());

        FunctionDescriptor.SerializableFunction<String, Tuple2<String, String>> mapFunction =
            line -> {
                String[] split = line.split(" ");
                return new Tuple2<>(split[0], split[1]);
            };

        MapOperator<String, Tuple2<String, String>> mapOperator = new MapOperator<String, Tuple2<String, String>>(
                mapFunction,
                String.class,
                ReflectionUtils.specify(Tuple2.class)
        );


        FunctionDescriptor.SerializableFunction<Tuple2<String, String>, String> keyFunction = tuple -> tuple.field0;

        ReduceByOperator<Tuple2<String, String>, String> reduceByOperator = new ReduceByOperator<Tuple2<String, String>, String>(
                keyFunction,
                (tuple, tuple2) -> {
                    return new Tuple2<>(tuple.field0, tuple.field1+" - "+tuple2.field1);
                },
                String.class,
                ReflectionUtils.specify(Tuple2.class)
        );

        LocalCallbackSink<Tuple2<?, ?>> sink = LocalCallbackSink.createCollectingSink(collector, ReflectionUtils.specify(Tuple2.class));

        source.connectTo(0, mapOperator, 0);
        mapOperator.connectTo(0, reduceByOperator, 0);
        reduceByOperator.connectTo(0, sink, 0);

        return new WayangPlan(sink);
    }

    public static WayangPlan sort(URI inputFileUri1, Collection<String> collector){
        TextFileSource source = new TextFileSource(inputFileUri1.toString());

        FunctionDescriptor.SerializableFunction<String, Iterable<String>> flatMapFunction =
                line -> {
                    return  Arrays.asList(line.split(" "));
                };

        FlatMapOperator<String, String> flatMapOperator = new FlatMapOperator<String, String>(
                flatMapFunction,
                String.class,
                String.class
        );

        SortOperator<String, String> sortOperator = new SortOperator<String, String>(
                word -> word,
                String.class,
                String.class
        );


        LocalCallbackSink<String> sink = LocalCallbackSink.createCollectingSink(collector, String.class);

        source.connectTo(0, flatMapOperator, 0);
        flatMapOperator.connectTo(0, sortOperator, 0);
        sortOperator.connectTo(0, sink, 0);

        return new WayangPlan(sink);
    }

    public static WayangPlan textFileSink(URI inputFileUri1, URI outputFileUri1){
        TextFileSource source = new TextFileSource(inputFileUri1.toString());

        TextFileSink<String> sink = new TextFileSink<String>(outputFileUri1.toString(), String.class);

        source.connectTo(0, sink, 0);

        return new WayangPlan(sink);
    }

    public static WayangPlan union(URI inputFileUri1, URI inputFileUri2, Collection<String> collector){
        TextFileSource source1 = new TextFileSource(inputFileUri1.toString());
        TextFileSource source2 = new TextFileSource(inputFileUri2.toString());

        UnionAllOperator<String> unionAllOperator = new UnionAllOperator<String>(String.class);

        LocalCallbackSink<String> sink = LocalCallbackSink.createCollectingSink(collector, String.class);

        source1.connectTo(0, unionAllOperator, 0);
        source2.connectTo(0, unionAllOperator, 1);
        unionAllOperator.connectTo(0, sink, 0);

        return new WayangPlan(sink);
    }

    public static WayangPlan zipWithId(URI inputFileUri1, Collection<Tuple2<Long, String>> collector){
        TextFileSource source1 = new TextFileSource(inputFileUri1.toString());

        ZipWithIdOperator<String> zipWithIdOperator = new ZipWithIdOperator<String>(String.class);

        LocalCallbackSink<Tuple2<Long, String>> sink = LocalCallbackSink.createCollectingSink(collector, ReflectionUtils.specify(Tuple2.class));

        source1.connectTo(0, zipWithIdOperator, 0);
        zipWithIdOperator.connectTo(0, sink, 0);

        return new WayangPlan(sink);
    }

    public static WayangPlan mapPartitions(Collection<Tuple2<String, Integer>> collector, int... inputValues) {
        CollectionSource<Integer> source = new CollectionSource<Integer>(WayangArrays.asList(inputValues), Integer.class);

        MapPartitionsOperator<Integer, Tuple2<String, Integer>> mapPartition = new MapPartitionsOperator<Integer, Tuple2<String, Integer>>(
                partition -> {
                    int numEvens = 0, numOdds = 0;
                    for (Integer value : partition) {
                        if ((value & 1) == 0) numEvens++;
                        else numOdds++;
                    }
                    return Arrays.asList(
                            new Tuple2<>("odd", numOdds),
                            new Tuple2<>("even", numEvens)
                    );
                },
                Integer.class,
                ReflectionUtils.specify(Tuple2.class)
        );

        FunctionDescriptor.SerializableFunction<Tuple2<String, Integer>, String> keyFunction = tuple -> tuple.field0;

        ReduceByOperator<Tuple2<String, Integer>, String> reduceByOperator = new ReduceByOperator<Tuple2<String, Integer>, String>(
                keyFunction,
                (t1, t2) -> {
                    return new Tuple2<>(t1.getField0(), t1.getField1() + t2.getField1());
                },
                String.class,
                ReflectionUtils.specify(Tuple2.class)
        );

        LocalCallbackSink<Tuple2<String, Integer>> sink = LocalCallbackSink.createCollectingSink(collector,  ReflectionUtils.specify(Tuple2.class));


        source.connectTo(0, mapPartition, 0);
        mapPartition.connectTo(0, reduceByOperator, 0);
        reduceByOperator.connectTo(0, sink, 0);

        return new WayangPlan(sink);

    }
}
