[MRQL-101] Fix memory materialization of lazy bags
diff --git a/conf/mrql-env.sh b/conf/mrql-env.sh
index 3da0168..332ca89 100644
--- a/conf/mrql-env.sh
+++ b/conf/mrql-env.sh
@@ -82,7 +82,7 @@
 # Spark versions 0.8.1, 0.9.0, and 0.9.1 are supported by MRQL 0.9.0 only.
 # You may use the Spark prebuilts bin-hadoop1 or bin-hadoop2 (Yarn)
 # Tested in local, standalone deploy, and Yarn modes
-SPARK_HOME=${HOME}/spark
+SPARK_HOME=${HOME}/spark-1.6.2-bin-hadoop2.6
 # URI of the Spark master node:
 #   to run Spark on Standalone Mode, set it to spark://`hostname`:7077
 #   to run Spark on a YARN cluster, set it to "yarn-client"
@@ -99,9 +99,9 @@
 
 
 # Optional: Flink configuration. Supports version 1.0.2 and 1.0.3
-FLINK_VERSION=1.1.2
+FLINK_VERSION=1.0.2
 # Flink installation directory
-FLINK_HOME=${HOME}/ap/flink-${FLINK_VERSION}
+FLINK_HOME=${HOME}/flink-${FLINK_VERSION}
 # number of slots per TaskManager (typically, the number of cores per node)
 FLINK_SLOTS=4
 # memory per TaskManager
diff --git a/core/src/main/java/org/apache/mrql/Interpreter.gen b/core/src/main/java/org/apache/mrql/Interpreter.gen
index c138c47..f934abe 100644
--- a/core/src/main/java/org/apache/mrql/Interpreter.gen
+++ b/core/src/main/java/org/apache/mrql/Interpreter.gen
@@ -939,12 +939,13 @@
             ne = Simplification.simplify_all(ne);
             if (Streaming.is_streaming(ne) && !Config.incremental)
                 ne = Streaming.streamify(ne);
+            ne = Materialization.materialize_terms(ne);
             Tree plan = PlanGeneration.makePlan(ne);
             if (Config.bsp_mode) {
                 BSPTranslator.reset();
                 if (Config.trace)
                     System.out.println("Physical plan:\n"+plan.pretty(0));
-                plan = Materialization.materialize_terms(BSPTranslator.constructBSPplan(plan));
+                plan = BSPTranslator.constructBSPplan(plan);
                 if (Config.trace)
                     System.out.println("BSP plan:\n"+plan.pretty(0));
                 else {
@@ -955,7 +956,7 @@
             } else {
                 if (Config.hadoop_mode)
                     plan = PlanGeneration.physical_plan(plan);
-                plan = Materialization.materialize_terms(AlgebraicOptimization.common_factoring(plan));
+                plan = AlgebraicOptimization.common_factoring(plan);
                 if (Config.trace)
                     System.out.println("Physical plan:\n"+plan.pretty(0));
                 else {
diff --git a/core/src/main/java/org/apache/mrql/Materialization.gen b/core/src/main/java/org/apache/mrql/Materialization.gen
index c056e20..e644b83 100644
--- a/core/src/main/java/org/apache/mrql/Materialization.gen
+++ b/core/src/main/java/org/apache/mrql/Materialization.gen
@@ -21,24 +21,24 @@
 import java.util.*;
 
 
-class Domains {
-    public Trees domains;
-    public Trees repeats;
-    public Domains ( Trees d, Trees r ) { domains = d; repeats = r; }
-}
-
-
 /** if the plan refers to a variable bound to a stream-based Bag and occurs in the code
  *   multiple times, embed code to materialize this Bag in memory
  */
 final public class Materialization extends Translator {
 
-    // is this a direct-access term? (not the results of a bulk operation)
+    // terms that access bulk terms directly
+    private static Trees access_terms = #[];
+    // access_terms that are repeated more than once
+    private static Trees repeat_terms = #[];
+
+    /** is this a direct-access term? (not the results of a bulk operation) */
     private static boolean access_variable ( Tree e ) {
         match e {
         case nth(`x,_):
             return access_variable(x);
-        case union_value(`x):
+        case project(`x,_):
+            return access_variable(x);
+        case typed(union_value(`x),_):
             return access_variable(x);
         case index(`x,`n):
             return access_variable(x);
@@ -49,90 +49,77 @@
         return false;
     }
 
-    private static Domains new_domain ( Trees vars, Tree e, Domains d ) {
-        if (!access_variable(e))
-            return materialize(vars,e,d);
-        Domains nd = new Domains(d.domains,d.repeats);
-        if ((d.domains.member(e) || !free_variables(e,vars).is_empty())
-            && !d.repeats.member(e))
-            nd.repeats = nd.repeats.cons(e);
-        nd.domains = nd.domains.cons(e);
-        return nd;
-    }
-
-    private static Domains union ( Domains xd, Domains yd ) {
-        Domains nd = new Domains(xd.domains,xd.repeats);
-        for ( Tree y: yd.domains )
-            if (!nd.domains.member(y))
-                nd.domains = nd.domains.cons(y);
-        for ( Tree y: yd.repeats )
-            if (!nd.repeats.member(y))
-                nd.repeats = nd.repeats.cons(y);
-        return nd;
-    }
-
-    final static int unionM = ClassImporter.find_method_number("plus",#[bag(any),bag(any)]);
-
-    private static Domains materialize ( Trees vars, Tree e, Domains d ) {
+    /** check if the term e is a direct access term that extracts a collection */
+    private static boolean transient_term ( Tree e ) {
         match e {
-        case lambda(`v,`b):
-            return materialize(#[`v],b,d);
-        case cmap(lambda(`v,`b),`s):
-            return materialize(#[`v],b,new_domain(vars,s,d));
-        case map(lambda(`v,`b),`s):
-            return materialize(#[`v],b,new_domain(vars,s,d));
-        case filter(lambda(`v1,`b1),lambda(`v2,`b2),`s):
-            return materialize(#[`v1],b1,materialize(#[`v2],b2,new_domain(vars,s,d)));
-        case aggregate(lambda(`v,`b),`z,`s):
-            return materialize(#[`v],b,new_domain(vars,s,d));
-        case groupBy(`s):
-            return new_domain(vars,s,d);
-        case orderBy(`s):
-            return new_domain(vars,s,d);
-        case mapReduce(lambda(`mv,`m),lambda(`v,`b),`s,`o):
-            return materialize(#[`v],b,materialize(#[`mv],m,
-                                                   new_domain(vars,s,d)));
-        case mapReduce2(lambda(`v1,`b1),lambda(`v2,`b2),lambda(`v,`b),`x,`y,`o):
-            return materialize(#[`v],b,materialize(#[`v1],b1,materialize(#[`v2],b2,
-                                         new_domain(vars,x,new_domain(vars,y,d)))));
-        case join(lambda(`v1,`b1),lambda(`v2,`b2),lambda(`v,`b),`x,`y):
-            return materialize(#[`v],b,materialize(#[`v1],b1,materialize(#[`v2],b2,
-                                         new_domain(vars,x,new_domain(vars,y,d)))));
-        case crossProduct(lambda(`v1,`b1),lambda(`v2,`b2),lambda(`v,`b),`x,`y):
-            return materialize(#[`v],b,materialize(#[`v1],b1,materialize(#[`v2],b2,
-                                         new_domain(vars,x,new_domain(vars,y,d)))));
-        case let(`v,`x,`y):
-            Domains nd = materialize(vars.cons(v),y,materialize(vars,x,d));
-            Trees zs = #[];
-            for ( Tree z: nd.repeats )
-                if (!v.equals(z))
-                    zs = zs.cons(z);
-            nd.repeats = zs;
-            return nd;
-        case if(`p,`x,`y):
-            Domains nd = materialize(vars,p,d);
-            return union(materialize(vars,x,nd),
-                         materialize(vars,y,nd));
-        case callM(union,_,`x,`y):
-            return new_domain(vars,x,new_domain(vars,y,d));
-        case callM(_,`k,`x,`y):
-            if (((LongLeaf)k).value() != unionM)
-                fail;
-            return new_domain(vars,x,new_domain(vars,y,d));
-        case callM(join_key,_,`x,`y):
-            return new_domain(vars,x,new_domain(vars,y,d));
-        case `f(...as):
-            Domains nd = new Domains(d.domains,d.repeats);
-            for ( Tree a: as )
-                nd = materialize(vars,a,nd);
-            return nd;
+        case materialize(_):
+            return false;
         };
-        return d;
+        if (e.is_variable() || !access_variable(e))
+            return false;
+        match TypeInference.type_inference(e) {
+        case `T(_):
+            if (is_collection(T))
+                return true;
+        };
+        return false;
     }
 
+    /** derive the access_terms and the repated_terms */
+    private static void find_repeated_terms ( Tree e ) {
+        if (transient_term(e))
+            if (access_terms.member(e)) {
+                if (!repeat_terms.member(e))
+                    repeat_terms = repeat_terms.cons(e);
+            } else access_terms = access_terms.cons(e);
+        else match e {
+            case type(_): ;
+            case materialize(_): ;
+            case `f(...as):
+                for ( Tree a: as)
+                    find_repeated_terms(a);
+            }
+    }
+
+    /** if the term e is a bag processing operation that accesses a term
+        in access_terms and it is inside the functional of another
+        bag processing operation, it is a repeated_term */
+    private static void repeated_terms ( Tree e, int level ) {
+        if (access_terms.member(e) && level > 0) {
+            if (!repeat_terms.member(e))
+                repeat_terms = repeat_terms.cons(e);
+        } else match e {
+            case `f(lambda(`v,`b),`x,...):
+            if (! #[cmap,map,filter,aggregate].member(#<`f>))
+                fail;
+            repeated_terms(b,level+1);  // the map function is repeated
+            repeated_terms(x,level);
+        case mapReduce(lambda(`v1,`b1),lambda(`v2,`b2),`x,...):
+            repeated_terms(b1,level+1);  // the map function is repeated
+            repeated_terms(b2,level+1);  // the reduce function is repeated
+            repeated_terms(x,level);
+        case `f(lambda(`v1,`b1),lambda(`v2,`b2),lambda(`v3,`b3),`x,`y,...):
+            if (! #[join,mapReduce2,crossProduct].member(#<`f>))
+                fail;
+            repeated_terms(b1,level+1);  // the left key function is repeated
+            repeated_terms(b2,level+1);  // the right key function is repeated
+            repeated_terms(b3,level+1);  // the reduce function is repeated
+            repeated_terms(x,level);
+            repeated_terms(y,level);
+        case `f(...as):
+            for ( Tree a: as)
+                repeated_terms(a,level);
+        }
+    }
+
+    /** if a direct-access term that constructs a lazy bag (an iterator)
+        is used more than once, materialize it in memory */
     public static Tree materialize_terms ( Tree e ) {
-        Domains d = materialize(#[],e,new Domains(#[],#[]));
-        for ( Tree x: d.repeats )
+        access_terms = #[];
+        repeat_terms = #[];
+        find_repeated_terms(e);
+        repeated_terms(e,0);
+        for ( Tree x: repeat_terms )
             e = subst(x,#<materialize(`x)>,e);
         return e;
     }
diff --git a/core/src/main/java/org/apache/mrql/TypeInference.gen b/core/src/main/java/org/apache/mrql/TypeInference.gen
index 60ca56c..bc17ca4 100644
--- a/core/src/main/java/org/apache/mrql/TypeInference.gen
+++ b/core/src/main/java/org/apache/mrql/TypeInference.gen
@@ -1448,6 +1448,7 @@
             };
         case callM(`f,_,...el):
             return type_inference(#<call(`f,...el)>);
+        case materialize(`x): return type_inference2(x);
         case true: return #<bool>;
         case false: return #<bool>;
         case null: return #<any>;
diff --git a/pom.xml b/pom.xml
index 9ac5989..6c18bfe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -49,7 +49,7 @@
     <hama.version>0.7.1</hama.version>
     <spark.version>1.6.2</spark.version>
     <scala.version>2.10</scala.version>
-    <flink.version>1.0.3</flink.version>
+    <flink.version>1.0.2</flink.version>
     <storm.version>1.0.2</storm.version>
     <skipTests>true</skipTests>
   </properties>