/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.mrql;

import java.util.List;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Arrays;
import java.util.Iterator;
import java.io.*;
import java.net.URI;
import java.net.URL;
import org.apache.mrql.gen.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.flink.util.Collector;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.functions.*;
import org.apache.flink.api.common.operators.*;
import org.apache.flink.api.java.*;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.functions.*;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.client.program.ContextEnvironment;


/** Evaluates physical plans in Apache Flink mode */
public class FlinkEvaluator extends Evaluator implements Serializable {
    final static URL flink_jar = FlinkEvaluator.class.getProtectionDomain().getCodeSource().getLocation();
    public static ExecutionEnvironment flink_env;
    public static StreamExecutionEnvironment stream_env;
    // an HDFS tmp file used to hold the data source directory information in distributed mode
    static String data_source_dir_name;
    static String master_host = "";
    static int master_port = 6123;
    static String fs_default_name;

    /** initialize the Flink evaluator */
    final public void init ( Configuration conf ) {
        DataSource.parserDirectory = new DataSource.ParserDirectory();
        DataSource.parserDirectory.put("xml",FlinkXMLParser.class);
        DataSource.parserDirectory.put("json",FlinkJsonParser.class);
        DataSource.parserDirectory.put("line",FlinkLineParser.class);
        // Disable bag materialization (it uses the Hadoop HDFS library instead of Flink's)
        Config.max_materialized_bag = Long.MAX_VALUE;
        if (Config.local_mode) {
            flink_env = ExecutionEnvironment.createLocalEnvironment(Config.nodes);
            // curently, the compiler doesn't work in local mode
            Config.compile_functional_arguments = false;
            fs_default_name = "file:///";
	    Plan.conf.set("fs.defaultFS",fs_default_name);
        } else if (Config.distributed_mode) {
            String master_node = System.getenv("FLINK_MASTER");
            if (master_node == null)
                throw new Error("Need to run the Flink application master first: $FLINK_HOME/bin/yarn-session.sh");
            if (!master_node.equals("")) {
		String[] m = master_node.split(":");
		if (m.length != 2)
		    throw new Error("Need both host name and port number for the Flink application master: "+master_node);
		master_host = m[0];
		master_port = Integer.parseInt(m[1]);
	    };
            if (!System.getenv("FS_DEFAULT_NAME").equals("")) {
                fs_default_name = System.getenv("FS_DEFAULT_NAME");
                Plan.conf.set("fs.defaultFS",fs_default_name);
            } else fs_default_name = Plan.conf.get("fs.defaultFS");
            data_source_dir_name = absolute_path("tmp/data_source_dir.txt");
        }
    }

    /** shutdown the Flink evaluator */
    final public void shutdown ( Configuration conf ) {
    }

    final public void initialize_query () {
        try {
            if (!Config.local_mode) {
		if (Config.compile_functional_arguments) {
		    Plan.distribute_compiled_arguments(Plan.conf);
		    if (master_host.equals("")) {
			ContextEnvironment ce = (ContextEnvironment)ExecutionEnvironment.getExecutionEnvironment();
			List<URL> jars = Arrays.asList(flink_jar,new URL("file://"+Plan.conf.get("mrql.jar.path")));
			flink_env = new ContextEnvironment(ce.getClient(),jars,new ArrayList<URL>(),
                                                           FlinkEvaluator.class.getClassLoader(),
                                                           Plan.new_path(Plan.conf));
			flink_env.setParallelism(Config.nodes);
		    } else flink_env = ExecutionEnvironment.createRemoteEnvironment(master_host,master_port,
					     Config.nodes,flink_jar.toURI().getPath(),Plan.conf.get("mrql.jar.path"));
		} else if (master_host.equals(""))
		    flink_env = ExecutionEnvironment.getExecutionEnvironment();
                else flink_env = ExecutionEnvironment.createRemoteEnvironment(master_host,master_port,
					     Config.nodes,flink_jar.toURI().getPath());
	    } else {
		flink_env.setDefaultLocalParallelism(Config.nodes);
                if (Config.stream_window > 0)
                    stream_env = StreamExecutionEnvironment.createLocalEnvironment(Config.nodes)
			               .setBufferTimeout(Config.stream_window);
            };
            if (!Config.info)
                flink_env.getConfig().disableSysoutLogging();
        } catch (Exception ex) {
            throw new Error("Cannot initialize the Flink evaluator: "+ex);
        }
    }

    final public Configuration new_configuration () {
        return new Configuration();
    }

    /** return the FileInputFormat for parsed files (CSV, XML, JSON, etc) */
    final public Class<? extends MRQLFileInputFormat> parsedInputFormat () {
        return FlinkParsedInputFormat.class;
    }

    /** return the FileInputFormat for binary files */
    final public Class<? extends MRQLFileInputFormat> binaryInputFormat () {
        return FlinkSequenceInputFormat.class;
    }

    /** return the FileInputFormat for data generator files */
    final public Class<? extends MRQLFileInputFormat> generatorInputFormat () {
        return FlinkGeneratorInputFormat.class;
    }

    /** evaluate plan in stream mode: evaluate each batch of data and apply the function f */
    final public void streaming ( Tree plan, Environment env, Function f ) {
        FlinkStreaming.evaluate(plan,env,f);
    }

    /** returns the absolute path relative to the directory that contains the MRQL executable */
    public static String absolute_path ( String path) {
        try {
            URI uri = new URI(path);
            if (uri.getScheme() == null)
                if (Config.hadoop_mode && !Config.local_mode)
                    if (uri.toString().startsWith("/"))
                        uri = new URI(fs_default_name+uri);
                    else uri = new URI(fs_default_name+"/user/"+System.getProperty("user.name")+"/"+uri);
                else if (uri.toString().startsWith("/"))
                    uri = new URI("file:"+uri);
                else uri = new URI("file:"+System.getProperty("user.dir")+"/"+uri);
            uri = uri.normalize();
            return uri.toString();
        } catch (Exception ex) {
            throw new Error("Wrong pathname: "+path);
        }
    }

    /** for dumped data to a file, return the MRQL type of the data */
    @Override
    public Tree get_type ( String file ) {
	return super.get_type(absolute_path(file));
    }

    /** dump MRQL data into a Hadoop Sequence file */
    @Override
    public void dump ( String file, Tree type, MRData data ) throws Exception {
	super.dump(absolute_path(file),type,data);
    }

    /** dump MRQL data into a text CVS file */
    @Override
    public void dump_text ( String file, Tree type, MRData data ) throws Exception {
        int ps = Config.max_bag_size_print;
        Config.max_bag_size_print = -1;
        Path path = new Path(absolute_path(file));
        FileSystem fs = path.getFileSystem();
        final PrintStream out = new PrintStream(fs.create(path,true));
        if (data instanceof MR_dataset)
            data = Plan.collect(((MR_dataset)data).dataset());
        if (Translator.collection_type(type)) {
            Tree tp = ((Node)type).children().head();
            if (tp instanceof Node && ((Node)tp).name().equals("tuple")) {
                Trees ts = ((Node)tp).children();
                for ( MRData x: (Bag)data ) {
                    Tuple t = (Tuple)x;
                    out.print(print(t.get((short)0),ts.nth(0)));
                    for ( short i = 1; i < t.size(); i++ )
                        out.print(","+print(t.get(i),ts.nth(i)));
                    out.println();
                }
            } else for ( MRData x: (Bag)data )
                       out.println(print(x,tp));
        } else out.println(print(data,query_type));
        Config.max_bag_size_print = ps;
        out.close();
    }

    private static MRData aggregate ( DataSet<FData> data_set, MRData zero, Tree acc_fnc, Tree merge_fnc, Tree type ) {
        DataSet<FData> d = data_set.flatMap(new FlinkDataSource.reduce_mapper(zero,acc_fnc.toString(),
                                                                              merge_fnc.toString()));
        //d.print();  // needs a datasink (prints the aggregation value)
        d.output(new DiscardingOutputFormat());
        try {
            // due to a Flink bug, we cannot return a custom object (FData) from an accumulator
            String value = FlinkEvaluator.flink_env.execute("MRQL aggregator").getAccumulatorResult("reducer");
            Tree tp = TopLevel.query_type;
            MRData res = MRQL.query(value+" as "+type);  // parse the value using the MRQL query parser (ugly)
            TopLevel.query_type = tp;
            return res;
        } catch (Exception ex) {
            throw new Error("Evaluation error during aggregation: "+ex);
        }
    }

    /* broadcast the loop value to all nodes */
    public static final class set_environment extends RichMapFunction<FData,FData> {
        String var;

        set_environment ( String v ) { var = v; }

        @Override
        public void open ( org.apache.flink.configuration.Configuration parameters ) throws Exception {
            final Bag value = bag(getRuntimeContext().getBroadcastVariable(var));
            Interpreter.new_global_binding(var.toString(),value);
        }

        @Override
        public FData map ( FData value ) throws Exception {
            return new FData(((Tuple)value.data()).first());
        }
    }

    private static Bag bag ( Collection<Object> values ) {
        Bag s = new Bag();
        for ( Object value: values )
            s.add(((FData)value).data());
        return s;
    }

    /* convert a Bag into a DataSet */
    private static DataSet<FData> dataset ( Bag s ) {
        ArrayList<FData> a = new ArrayList(s.size());
        for ( MRData x: s )
            a.add(new FData(x));
        return flink_env.fromCollection(a);
    }

    /** Coerce a persistent collection to a Bag */
    public Bag toBag ( MRData data ) {
        try {
            if (data instanceof MR_flink) {
                final Iterator<FData> i = ((MR_flink)data).flink().collect().iterator();
                return new Bag(new BagIterator() {
                        public MRData next () {
                            return i.next().data();
                        }
                        public boolean hasNext () {
                            return i.hasNext();
                        }
                    });
            };
            return (Bag)data;
        } catch (Exception ex) {
            throw new Error("Cannot coerce "+data+" to a Bag: "+ex);
        }
    }

    /** The Aggregate physical operator
     * @param merge  the accumulator function from (T,T) to T
     * @param zero  the zero element of type T
     * @param plan the plan that constructs the dataset that contains the bag of values {T}
     * @param env contains bindings fro variables to values (MRData)
     * @return the aggregation result of type T
     */
    final public MRData aggregate ( Tree merge,
                                    Tree zero,
                                    Tree plan,
                                    Environment env ) throws Exception {
        final MRData z = evalE(zero,env);
        Tree type = TypeInference.type_inference(zero);
        match plan {
        case repeat(lambda(`v,`b),`s,`n):
            // Special case for Flink: repetition where the loop result must be shared among nodes
            final int max_num = ((MR_int)evalE(n,null)).get();
            final Bag init = (Bag)evalE(s,env);
            Interpreter.new_global_binding(v.toString(),init);
            final DataSet<FData> ds = dataset(init);
            final IterativeDataSet<FData> startOfIteration = ds.iterate(max_num);
            final DataSet<FData> step = eval(b,env);
            final DataSet<FData> toBeFedBack = step.map(new set_environment(v.toString()))
                                                   .withBroadcastSet(startOfIteration,v.toString());
            // Flink doesn't allow termination criteria for kmeans-like iterations:
            //   (java.lang.IllegalStateException: Error: Iterative task without a single iterative input)
            //final DataSet<FData> terminationCriterion = step.filter(new repeat_filter());
            final DataSet<FData> res = startOfIteration.closeWith(toBeFedBack);//,terminationCriterion);
            String path = absolute_path(Plan.new_path(Plan.conf));
            res.write(new FlinkOutputFormat(),path);
            flink_env.execute("MRQL repeat");
            Bag bs = new Bag();
            for ( MRData x: new FlinkDataSource(res,path,false).take(-1) )
                bs.add(x);
            return bs;
        case AggregateMap(`m,`acc,_,`s):
            final DataSet<FData> ds = eval(s,env).mapPartition(new aggregate_partition(m,acc,z));
            return aggregate(ds,z,merge,merge,type);
        case MapAggregateReduce(`m,`r,`acc,_,`s,`o):
            if (acc.equals(#<null>))
                fail;
            final DataSet<FData> ds = eval(s,env).flatMap(cmap_fnc(m));
            final DataSet<FData> gs = ds.groupBy(new KeySelector<FData,FData>() {
                                                      @Override
                                                      public FData getKey ( FData value ) {
                                                          return new FData(((Tuple)value.data()).first());
                                                      }
                                             }).reduceGroup(new groupBy_combiner_reducer(r,acc,z));
            return aggregate(gs,z,merge,merge,type);
        case CrossAggregateProduct(`mx,`my,`r,`acc,_,`x,`y):
            if (acc.equals(#<null>))
                fail;
            final DataSet<FData> xs = eval(x,env).flatMap(cmap_fnc(mx));
            final DataSet<FData> ys = eval(y,env).flatMap(cmap_fnc(my));
            final DataSet<FData> ds = xs.cross(ys).with(new cross_combiner_reducer(r,acc,z));
            return aggregate(ds,z,merge,merge,type);
        case MapAggregateReduce2(`mx,`my,`r,`acc,_,`x,`y,`o):
            if (acc.equals(#<null>))
                fail;
            final DataSet<FData> xs = eval(x,env).flatMap(cmap_fnc(mx));
            final DataSet<FData> ys = eval(y,env).flatMap(cmap_fnc(my));
            final DataSet<FData> ds = xs.coGroup(ys)
                .where(new join_key())
                .equalTo(new join_key())
                .with(new join_combiner_reducer(r,acc,z));
            return aggregate(ds,z,merge,merge,type);
        case MapAggregateJoin(`mx,`my,`r,`acc,_,`x,`y):
            if (acc.equals(#<null>))
                fail;
            return aggregate(evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env),z,acc,merge,type);
        };
        throw new Error("Unrecognized aggregation: "+plan);
    }

    /** Evaluate a loop a fixed number of times. Doesn't use Flink iterations */
    final public Tuple loop ( Tree e, Environment env ) throws Exception {
        match e {
        case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num):
            final int limit = ((MR_int)evalE(num,env)).get();
            MR_flink[] s = new MR_flink[vs.length()];
            for ( int i = 0; i < vs.length(); i++ )
                s[i] = new MR_flink(eval(ss.nth(i),env));
            for ( int n = 0; n < limit; n++ ) {
                Environment nenv = env;
                for ( int i = 0; i < vs.length(); i ++ )
                    nenv = new Environment(vs.nth(i).toString(),s[i],nenv);
                for ( int i = 0; i < vs.length(); i ++ )
                    s[i] = new MR_flink(eval(bs.nth(i),nenv));
            };
            return new Tuple(s);
        };
        throw new Error("Wrong Loop format: "+e);
    }

    private static boolean needs_sorting ( Tree e ) {
        match e {
        case MapReduce(`m,`r,`s,true): return true;
        case MapCombineReduce(`m,`c,`r,`s,true): return true;
        };
        return false;
    }

    /** Evaluate an MRQL physical plan and print tracing info
     * @param e the physical plan
     * @param env contains bindings fro variables to values (MRData)
     * @return an MRQL DataSet (stored in HDFS)
     */
    final public org.apache.mrql.DataSet eval ( final Tree e,
                                                final Environment env,
                                                final String counter ) {
        DataSet<FData> d = eval(e,env);
        try {
            String path = absolute_path(Plan.new_path(Plan.conf));
            d.write(new FlinkOutputFormat(),path);
            flink_env.execute("MRQL query");
            return new org.apache.mrql.DataSet(new FlinkDataSource(d,path,needs_sorting(e)),0L,0L);
        } catch (Exception ex) {
            System.err.println(ex.getMessage());
            throw new Error("Cannot evaluate the query: "+e);
        }
    }

    /** Evaluate an MRQL physical plan using Flink and print tracing info
     * @param e the physical plan
     * @param env contains bindings from variables to values (MRData)
     * @return a Flink DataSet
     */
    final public DataSet<FData> eval ( final Tree e, final Environment env ) {
        if (Config.trace_execution) {
            tab_count += 3;
            System.out.println(tabs(tab_count)+print_query(e));
        };
        final DataSet<FData> res = evalD(e,env);
        if (Config.trace_execution) 
            try {
                System.out.println(tabs(tab_count)+"->");
                res.first(Config.max_bag_size_print).print();
                tab_count -= 3;
            } catch (Exception ex) {
                System.err.println(ex.getMessage());
                throw new Error("Cannot collect the operator output: "+e);
            };
        return res;
    }

    /* convert an MRQL lambda to a Flink Function */
    private static RichFlatMapFunction<FData,FData> cmap_fnc ( final Tree fnc ) {
        final Function f = evalF(fnc,null);
        return new RichFlatMapFunction<FData,FData>() {
            @Override
            public void flatMap ( FData value, Collector<FData> out ) throws Exception {
                for ( MRData e: (Bag)f.eval(value.data()) )
                    out.collect(new FData(e));
            }
        };
    }

    /* convert the stream of pairs to a bag of their second element */
    private static Bag bag ( Iterable<FData> values ) {
        Bag s = new Bag();
        for ( FData value: values )
            s.add(((Tuple)value.data()).second());
        return s;
    }

    /* group-by s and then reduce by reduce_fnc (optional: use combine_fnc) */
    private static DataSet<FData> groupBy ( DataSet<FData> s, Tree combine_fnc, Tree reduce_fnc ) {
        final Function combiner = (combine_fnc == null) ? null : evalF(combine_fnc,null);
        final Function reducer = evalF(reduce_fnc,null);
        DataSet<FData> ds = s;
        if (combiner != null) {
            ds = ds.groupBy(new KeySelector<FData,FData>() {
                    @Override
                    public FData getKey ( FData value ) {
                        return new FData(((Tuple)value.data()).first());
                    }
            }).combineGroup(new RichGroupCombineFunction<FData,FData>() {
                @Override
                public void combine ( Iterable<FData> values, Collector<FData> out ) throws Exception {
                    Bag c = null;
                    MRData key = null;
                    for ( FData v: values )
                        if (c == null) {
                            c = new Bag(((Tuple)v.data()).second());
                            key = ((Tuple)v.data()).first();
                        } else
                            c = (Bag)combiner.eval(new Tuple(key,c.add_element(((Tuple)v.data()).second())));
                    for ( MRData x: c )
                        out.collect(new FData(new Tuple(key,x)));
                }
           });
        };
        return ds.groupBy(new KeySelector<FData,FData>() {
                @Override
                public FData getKey ( FData value ) {
                    return new FData(((Tuple)value.data()).first());
                }
            }).reduceGroup(new GroupReduceFunction<FData,FData>() {
                @Override
                public void reduce ( final Iterable<FData> values, Collector<FData> out ) {
                    Bag s = new Bag();
                    MRData key = null;
                    for ( FData value: values ) {
                        Tuple t = (Tuple)value.data();
                        key = t.first();
                        s.add(t.second());
                    };
                    for ( MRData v: (Bag)reducer.eval(new Tuple(key,s)) )
                        out.collect(new FData(v));
                }
           });
    }

    /* group-by and sort s and then reduce by reduce_fnc (optional: use combine_fnc) */
    private static DataSet<FData> sortBy ( DataSet<FData> s, Tree combine_fnc, Tree reduce_fnc ) {
        final Function combiner = (combine_fnc == null) ? null : evalF(combine_fnc,null);
        final Function reducer = evalF(reduce_fnc,null);
        DataSet<Tuple2<FData,FData>> ds = s.map(new RichMapFunction<FData,Tuple2<FData,FData>>() {
                @Override
                public Tuple2<FData,FData> map ( FData value ) {
                    Tuple t = (Tuple)value.data();
                    return new Tuple2<FData,FData>(new FData(t.first()),new FData(t.second()));
                }
            });
        if (combiner != null) {
            ds = ds.groupBy(0).combineGroup(new GroupCombineFunction<Tuple2<FData,FData>,Tuple2<FData,FData>>() {
                    @Override
                    public void combine ( Iterable<Tuple2<FData,FData>> values,
                                          Collector<Tuple2<FData,FData>> out ) throws Exception {
                        Bag c = null;
                        FData key = null;
                        for ( Tuple2<FData,FData> v: values )
                            if (c == null) {
                                c = new Bag(v.f1.data());
                                key = v.f0;
                            } else
                                c = (Bag)combiner.eval(new Tuple(key.data(),c.add_element(v.f0.data())));
                        for ( MRData x: c )
                            out.collect(new Tuple2<FData,FData>(key,new FData(x)));
                    }
                });
        };
        return ds.groupBy(0).sortGroup(0,Order.ASCENDING)
            .reduceGroup(new RichGroupReduceFunction<Tuple2<FData,FData>,FData>() {
                    @Override
                    public void reduce ( final Iterable<Tuple2<FData,FData>> values, Collector<FData> out ) {
                        Bag s = new Bag();
                        MRData key = null;
                        for ( Tuple2<FData,FData> value: values ) {
                            key = value.f0.data();
                            s.add(value.f1.data());
                        };
                        for ( MRData v: (Bag)reducer.eval(new Tuple(key,s)) )
                            out.collect(new FData(new Tuple(key,v)));
                    }
               });
    }

    public static final class join_key implements KeySelector<FData,FData> {
        @Override
        public FData getKey ( FData value ) {
            return new FData(((Tuple)value.data()).first());
        }
    }

    public static final class aggregate_partition extends RichMapPartitionFunction<FData,FData> {
        final Function m;
        final Function acc;
        final MRData zero;

        aggregate_partition ( Tree m, Tree acc, MRData zero ) {
            this.m = evalF(m,null);
            this.acc = evalF(acc,null);
            this.zero = zero;
        }

        @Override
        public void mapPartition ( Iterable<FData> values, Collector<FData> out ) throws Exception {
            MRData aggregated_value = zero;
            for ( FData value: values )
                for ( MRData v: (Bag)m.eval(value.data()) )
                    aggregated_value = acc.eval(new Tuple(aggregated_value,v));
            out.collect(new FData(aggregated_value));
        }
    }

    public static final class groupBy_combiner_reducer extends RichGroupReduceFunction<FData,FData> {
        final Function r;
        final Function acc;
        final MRData zero;

        groupBy_combiner_reducer ( Tree r, Tree acc, MRData zero ) {
            this.r = evalF(r,null);
            this.acc = evalF(acc,null);
            this.zero = zero;
        }

        @Override
        public void reduce ( final Iterable<FData> values, Collector<FData> out ) {
            MRData value = zero;
            for ( MRData v: (Bag)r.eval(new Tuple(bag(values))) )
                value = acc.eval(new Tuple(value,v));
            out.collect(new FData(value));
        }
    }

    public static final class join_reducer extends RichCoGroupFunction<FData,FData,FData> {
        final Function f;

        join_reducer ( Tree fnc ) {
            f = evalF(fnc,null);
        }

        @Override
        public void coGroup ( Iterable<FData> xs, Iterable<FData> ys, Collector<FData> out ) {
            for ( MRData v: (Bag)f.eval(new Tuple(bag(xs),bag(ys))) )
                out.collect(new FData(v));
        }
    }

    public static final class join_combiner_reducer extends RichCoGroupFunction<FData,FData,FData> {
        final Function r;
        final Function acc;
        final MRData zero;

        join_combiner_reducer ( Tree r, Tree acc, MRData zero ) {
            this.r = evalF(r,null);
            this.acc = evalF(acc,null);
            this.zero = zero;
        }

        @Override
        public void coGroup ( Iterable<FData> xs, Iterable<FData> ys, Collector<FData> out ) {
            MRData value = zero;
            for ( MRData v: (Bag)r.eval(new Tuple(bag(xs),bag(ys))) )
                value = acc.eval(new Tuple(value,v));
            out.collect(new FData(value));
        }
    }

    public static final class cross_reducer extends RichCrossFunction<FData,FData,FData> {
        final Function f;

        cross_reducer ( Tree fnc ) {
            f = evalF(fnc,null);
        }

        @Override
        public FData cross ( FData x, FData y ) throws Exception {
            return new FData(f.eval(new Tuple(x.data(),y.data())));
        }
    }

    public static final class cross_combiner_reducer extends RichCrossFunction<FData,FData,FData> {
        final Function r;
        final Function acc;
        final MRData zero;

        cross_combiner_reducer ( Tree r, Tree acc, MRData zero ) {
            this.r = evalF(r,null);
            this.acc = evalF(acc,null);
            this.zero = zero;
        }

        @Override
        public FData cross ( FData x, FData y ) throws Exception {
            return new FData(acc.eval(new Tuple(zero,r.eval(new Tuple(x.data(),y.data())))));
        }
    }

    public static final class mapjoin_reducer extends RichJoinFunction<FData,FData,FData> {
        final Function f;

        mapjoin_reducer ( Tree fnc ) {
            f = evalF(fnc,null);
        }

        @Override
        public FData join ( FData x, FData y ) throws Exception {
            return new FData(f.eval(new Tuple(x.data(),y.data())));
        }
    }

    /** restore the global bindings at the client node */
    public static final class restore_global_functions extends RichMapFunction<FData,FData> {
        Environment client_env;

        restore_global_functions () {
            client_env = Interpreter.global_env;
        }

        @Override
        public void open ( org.apache.flink.configuration.Configuration parameters ) throws Exception {
            Interpreter.set_global_bindings(client_env);
        }

        @Override
        public FData map ( FData value ) throws Exception { return value; }
    }

    public static final class generator_mapper extends RichMapFunction<Long,FData> {
        @Override
        public FData map ( Long value ) throws Exception {
            return new FData(new MR_long(value));
        }
    }

    public static final class parsed_mapper extends RichFlatMapFunction<String,FData> {
        final Parser parser;

        parsed_mapper ( Class<? extends Parser> parser_class ) throws Exception {
            parser = parser_class.newInstance();
        }

        @Override
        public void flatMap ( String value, Collector<FData> out ) throws Exception {
            for ( MRData e: parser.parse(value) )
                out.collect(new FData(e));
        }
    }

    public static final class group_join_left extends RichFlatMapFunction<FData,FData> {
        final MRData one = new MR_byte(1);
        final Function fgx;
        final int m, n;

        group_join_left ( Tree gx, int m, int n ) {
            this.m = m;
            this.n = n;
            fgx = evalF(gx,null);
        }

        @Override
        public void flatMap ( FData value, Collector<FData> out ) throws Exception {
            for ( int i = 0; i < n; i++ ) {
                final MRData key = new MR_int((fgx.eval(value.data()).hashCode() % m)+m*i);
                out.collect(new FData(new Tuple(key,new Tuple(one,value.data()))));
            }
        }
    }

    public static final class group_join_right extends RichFlatMapFunction<FData,FData> {
        final MRData two = new MR_byte(2);
        final Function fgy;
        final int m, n;

        group_join_right ( Tree gy, int m, int n ) {
            this.m = m;
            this.n = n;
            fgy = evalF(gy,null);
        }

        @Override
        public void flatMap ( FData value, Collector<FData> out ) throws Exception {
            for ( int j = 0; j < m; j++ ) {
                final MRData key = new MR_int((fgy.eval(value.data()).hashCode() % n)*m+j);
                out.collect(new FData(new Tuple(key,new Tuple(two,value.data()))));
            }
        }
    }

    public static final class group_join_reducer extends RichGroupReduceFunction<FData,FData> {
        final Function fkx, fky, fgx, fgy, fc, fr;
        final MRData z;

        group_join_reducer ( Tree kx, Tree ky, Tree gx, Tree gy, Tree c, Tree r, Tree zero ) {
            fkx = evalF(kx,null); fky = evalF(ky,null); fgx = evalF(gx,null); fgy = evalF(gy,null);
            fr = evalF(r,null); fc = evalF(c,null); z = evalE(zero,null);
        }

        @Override
        public void reduce ( final Iterable<FData> values, Collector<FData> out ) {
            Bag xb = new Bag();
            Bag yb = new Bag();
            for ( FData value: values ) {
                final Tuple t = (Tuple)value.data();
                final Tuple p = (Tuple)t.second();
                if (((MR_byte)p.first()).get() == 1)
                    xb.add(new Tuple(t.first(),p.second()));
                else yb.add(new Tuple(t.first(),p.second()));
            }
            final Bag b = MapReduceAlgebra.mergeGroupByJoin(fkx,fky,fgx,fgy,fc,z,fr,xb,yb);
            for ( MRData v: b )
                out.collect(new FData(v));
        };
    }

    public static final class repeat_values extends RichMapFunction<FData,FData> {
        @Override
        public FData map ( FData value ) throws Exception {
            return new FData(((Tuple)value.data()).first());
        }
    }

    public static final class repeat_filter extends RichFilterFunction<FData> {
        @Override
        public boolean filter ( FData value ) throws Exception {
            return ((MR_bool)(((Tuple)value.data()).second())).get();
        }
    }

    /** Evaluate a MRQL physical plan
     * @param e the physical plan
     * @param env contains bindings from variables to values (MRData)
     * @return a Flink DataSet
     */
    final private DataSet<FData> evalD ( final Tree e, final Environment env ) {
        try {
            match e {
            case MapAggregateReduce(`m,`r,null,_,`s,`o):
                return evalD(#<MapReduce(`m,`r,`s,`o)>,env);
            case CrossAggregateProduct(`mx,`my,`r,null,_,`x,`y):
                return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env);
            case MapAggregateReduce2(`mx,`my,`r,null,_,`x,`y,`o):
                return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env);
            case MapAggregateJoin(`mx,`my,`r,null,_,`x,`y):
                return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env);
            case cMap(`f,`s):
                return eval(s,env).flatMap(cmap_fnc(f));
            case MapReduce(`m,`r,`s,true):
                return sortBy(eval(s,env).flatMap(cmap_fnc(m)),null,r);
            case MapReduce(`m,`r,`s,`o):
                return groupBy(eval(s,env).flatMap(cmap_fnc(m)),null,r);
            case MapCombineReduce(`m,`c,`r,`s,true):
                return sortBy(eval(s,env).flatMap(cmap_fnc(m)),c,r);
            case MapCombineReduce(`m,`c,`r,`s,`o):
                return groupBy(eval(s,env).flatMap(cmap_fnc(m)),c,r);
            case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,`o):
                return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env);
            case MapReduce2(`mx,`my,`r,`x,`y,`o):
                final DataSet<FData> xs = eval(x,env).flatMap(cmap_fnc(mx));
                final DataSet<FData> ys = eval(y,env).flatMap(cmap_fnc(my));
                final DataSet<FData> ds = xs.coGroup(ys)
                    .where(new join_key())
                    .equalTo(new join_key())
                    .with(new join_reducer(r));
                if (o.equals(#<true>))
                    return sortBy(ds,null,#<lambda(x,bag(x))>);
                else return ds;
            case MapJoin(`mx,`my,`r,`x,`y):
                final DataSet<FData> xs = eval(x,env).flatMap(cmap_fnc(mx));
                final DataSet<FData> ys = eval(y,env).flatMap(cmap_fnc(my));
                return xs.joinWithTiny(ys)
                    .where(new join_key())
                    .equalTo(new join_key())
                    .with(new mapjoin_reducer(r));
            case CrossProduct(`mx,`my,`r,`x,`y):
                final DataSet<FData> xs = eval(x,env).flatMap(cmap_fnc(mx));
                final DataSet<FData> ys = eval(y,env).flatMap(cmap_fnc(my));
                return xs.cross(ys).with(new cross_reducer(r))
                         .flatMap(cmap_fnc(#<lambda(x,x)>));
            case GroupByJoin(`kx,`ky,`gx,`gy,`acc,`zero,`r,`x,`y,`o):
                final int n = Math.max(2,(int)Math.floor(Math.sqrt(Config.nodes)));
                final int m = n;
                // System.err.println("Using a groupBy join on a "+n+"*"+m+" grid of partitions");
                final DataSet<FData> xs = eval(x,env).flatMap(new group_join_left(gx,m,n));
                final DataSet<FData> ys = eval(y,env).flatMap(new group_join_right(gy,m,n));
                return xs.union(ys).groupBy(new join_key())
                         .reduceGroup(new group_join_reducer(kx,ky,gx,gy,acc,r,zero));
            case Merge(`x,`y):
                return eval(x,env).union(eval(y,env));
            case Generator(`min,`max,`size):
                final long from = ((MR_long)evalE(min,null)).get();
                final long to = ((MR_long)evalE(max,null)).get();
                return flink_env.generateSequence(from,to).map(new generator_mapper())
                                .map(new restore_global_functions());
            case BinarySource(`file,_):
                final String path = absolute_path(((MR_string)evalE(file,null)).get());
                new BinaryDataSource(path,Plan.conf);
                final FileInputFormat<FData> sf = new FlinkSequenceInputFormat().inputFormat(path);
                return flink_env.createInput(sf).map(new restore_global_functions());
            case ParsedSource(`parser,`file,...args):
                final String path = absolute_path(((MR_string)evalE(file,null)).get());
                final Class<? extends Parser> parser_class = DataSource.parserDirectory.get(parser.toString());
                if (parser_class == null)
                    throw new Error("Unknown parser: "+parser);
                final DataSource ds = new FlinkParsedDataSource(path,parser_class,args);
                return flink_env.readFile(new FlinkParsedInputFormat.ParsedInputFormat(path),path)
                                .map(new restore_global_functions());
            case Repeat(lambda(`v,`b),`s,`n):
                final int max_num = ((MR_int)evalE(n,null)).get();
                final IterativeDataSet<FData> startOfIteration = eval(s,env).iterate(max_num);
                final DataSet<FData> step = eval(b,new Environment(v.toString(),new MR_flink(startOfIteration),env));
                final DataSet<FData> toBeFedBack = step.map(new repeat_values());
                final DataSet<FData> terminationCriterion = step.filter(new repeat_filter());
                return startOfIteration.closeWith(toBeFedBack,terminationCriterion);
            case Closure(lambda(`v,`b),`s,`n):
                final int max_num = ((MR_int)evalE(n,null)).get();
                IterativeDataSet<FData> startOfIteration = eval(s,env).iterate(max_num);
                DataSet<FData> toBeFedBack = eval(b,new Environment(v.toString(),new MR_flink(startOfIteration),env));
                return startOfIteration.closeWith(toBeFedBack);
            case let(`v,DataSetCollect(`s),`body):
                MRData x = evalE(s,env);
                if (x instanceof MR_dataset)
                    x = new Bag(((MR_dataset)x).dataset().take(Integer.MAX_VALUE));
                else if (x instanceof Bag)
                    ((Bag)x).materialize();
                new_distributed_binding(v.toString(),x);
                return eval(body,new Environment(v.toString(),x,env));
            case let(`v,`u,`body):
                return eval(body,new Environment(v.toString(),evalE(u,null),env));
            case Let(`v,`u,`body):
                return eval(body,new Environment(v.toString(),new MR_flink(eval(u,env)),env));
            case If(`c,`x,`y):
                if (((MR_bool)evalE(c,env)).get())
                    return eval(x,env);
                else return eval(y,env);
           case nth(`u,`n):
                MRData x = ((Tuple)evalE(u,env)).get((int)n.longValue());
                if (x instanceof MR_flink)
                    return ((MR_flink)x).flink();
                throw new Error("Evaluation error in: "+print_query(e));
            case `v:
                if (!v.is_variable())
                    fail;
                MRData x = variable_lookup(v.toString(),env);
                if (x != null)
                    if (x instanceof MR_flink)
                        return ((MR_flink)x).flink();
                    else new Error("Variable "+v+" is of type: "+x);
                x = variable_lookup(v.toString(),global_env);
                if (x != null)
                    if (x instanceof MR_flink)
                        return ((MR_flink)x).flink();
                    else if (x instanceof MR_dataset)
                        return dataset(new Bag(((MR_dataset)x).dataset.take(Integer.MAX_VALUE)))
			    .map(new restore_global_functions());
                    else new Error("Variable "+v+" is of type: "+x);
                throw new Error("Variable "+v+" is not bound");
            };
            throw new Error("Cannot evaluate the Flink plan: "+e);
        } catch (Error msg) {
            if (!Config.trace)
                throw new Error(msg.getMessage());
            System.err.println(msg.getMessage());
            msg.printStackTrace();
            throw new Error("Evaluation error in: "+print_query(e));
        } catch (Exception ex) {
            System.err.println(ex.getMessage());
            ex.printStackTrace();
            throw new Error("Evaluation error in: "+print_query(e));
        }
    }
}
