[MRQL-92] Use outer-joins for incremental queries in Spark streaming mode
diff --git a/conf/mrql-env.sh b/conf/mrql-env.sh
index a9b16cb..77da529 100644
--- a/conf/mrql-env.sh
+++ b/conf/mrql-env.sh
@@ -72,12 +72,12 @@
 BSP_SPLIT_INPUT=
 
 
-# Optional: Spark configuration. Supports versions 1.0.0, 1.0.2, 1.1.0, 1.1.1, 1.2.0, 1.3.0, 1.3.1, and 1.6.0
+# Optional: Spark configuration. Supports versions 1.0.0, 1.0.2, 1.1.0, 1.1.1, 1.2.0, 1.3.0, 1.3.1, 1.6.0, and 1.6.2
 # (Spark versions 0.8.1, 0.9.0, and 0.9.1 are supported by MRQL 0.9.0)
 # You may use the Spark prebuilts bin-hadoop1 or bin-hadoop2 (Yarn)
 # For distributed mode, give write permission to /tmp: hadoop fs -chmod -R 777 /tmp
 # Tested in local, standalone deploy, and Yarn modes
-SPARK_HOME=${HOME}/spark-1.6.0-bin-hadoop2.6
+SPARK_HOME=${HOME}/spark-1.6.2-bin-hadoop2.6
 # URI of the Spark master node:
 #   to run Spark on Standalone Mode, set it to spark://`hostname`:7077
 #   to run Spark on a YARN cluster, set it to "yarn-client"
diff --git a/core/src/main/java/org/apache/mrql/Evaluator.java b/core/src/main/java/org/apache/mrql/Evaluator.java
index ac89f4e..a504087 100644
--- a/core/src/main/java/org/apache/mrql/Evaluator.java
+++ b/core/src/main/java/org/apache/mrql/Evaluator.java
@@ -159,7 +159,7 @@
     }
 
     /** evaluate plan in stream mode: evaluate each batch of data and apply the function f */
-    public void streaming ( Tree plan, Environment env, Function f ) {
+    public void streaming ( Tree plan, Environment env, Environment dataset_env, Function f ) {
         throw new Error("MRQL Streaming is not supported in this evaluation mode yet");
     }
 
diff --git a/core/src/main/java/org/apache/mrql/Interpreter.gen b/core/src/main/java/org/apache/mrql/Interpreter.gen
index 054963c..600f5aa 100644
--- a/core/src/main/java/org/apache/mrql/Interpreter.gen
+++ b/core/src/main/java/org/apache/mrql/Interpreter.gen
@@ -606,7 +606,7 @@
             return ((Lambda)fnc).lambda().eval(t);
         case Stream(...):  // streaming
             final Tree qt = query_type;
-            Evaluator.evaluator.streaming(e,env,new Function(){
+            Evaluator.evaluator.streaming(e,env,null,new Function(){
                     public MRData eval ( final MRData data ) {
                         System.out.println(print(data,qt));
                         return data;
diff --git a/core/src/main/java/org/apache/mrql/Main.java b/core/src/main/java/org/apache/mrql/Main.java
index 087f439..cafc62f 100644
--- a/core/src/main/java/org/apache/mrql/Main.java
+++ b/core/src/main/java/org/apache/mrql/Main.java
@@ -28,7 +28,7 @@
 
 
 final public class Main {
-    public final static String version = "0.9.6";
+    public final static String version = "0.9.8";
 
     public static PrintStream print_stream;
     public static Configuration conf;
diff --git a/core/src/main/java/org/apache/mrql/PlanGeneration.gen b/core/src/main/java/org/apache/mrql/PlanGeneration.gen
index 1b724da..a42380c 100644
--- a/core/src/main/java/org/apache/mrql/PlanGeneration.gen
+++ b/core/src/main/java/org/apache/mrql/PlanGeneration.gen
@@ -418,6 +418,18 @@
                                      `(makePlan(x)),
                                      `(makePlan(y)))>;
            else fail
+       case outerMerge(`m,`x,`y):
+               if (is_dataset_expr(x) && is_dataset_expr(y))
+                   return #<OuterMerge(`(makePlan(m)),
+                                       `(makePlan(x)),
+                                       `(makePlan(y)))>;
+               else fail
+       case rightOuterMerge(`m,`x,`y):
+               if (is_dataset_expr(x) && is_dataset_expr(y))
+                   return #<RightOuterMerge(`(makePlan(m)),
+                                            `(makePlan(x)),
+                                            `(makePlan(y)))>;
+               else fail
        case cmap(`m,`s):
            if (is_dataset_expr(s))
                return #<cMap(`(makePlan(m)),
diff --git a/core/src/main/java/org/apache/mrql/Printer.gen b/core/src/main/java/org/apache/mrql/Printer.gen
index 18975c9..03fc28e 100644
--- a/core/src/main/java/org/apache/mrql/Printer.gen
+++ b/core/src/main/java/org/apache/mrql/Printer.gen
@@ -342,6 +342,12 @@
         case CrossProduct(`mx,`my,`r,`x,`y):
             return "CrossProduct:\n"+tab(n+3)+"left: "+print_plan(x,n+9,true)+"\n"
                    +tab(n+3)+"right: "+print_plan(y,n+10,true);
+        case OuterMerge(`merge,`x,`y):
+            return "OuterMerge:\n"+tab(n+3)+"state: "+print_plan(x,n+10,true)+"\n"
+                   +tab(n+3)+"delta: "+print_plan(y,n+10,true);
+        case RightOuterMerge(`merge,`x,`y):
+            return "RightOuterMerge:\n"+tab(n+3)+"state: "+print_plan(x,n+10,true)+"\n"
+                   +tab(n+3)+"delta: "+print_plan(y,n+10,true);
         case CrossAggregateProduct(`mx,`my,`r,`a,null,`x,`y):
             return "CrossProduct:\n"+tab(n+3)+"left: "+print_plan(x,n+9,true)+"\n"
                    +tab(n+3)+"right: "+print_plan(y,n+10,true);
diff --git a/core/src/main/java/org/apache/mrql/Streaming.gen b/core/src/main/java/org/apache/mrql/Streaming.gen
index 99ce205..f54cad8 100644
--- a/core/src/main/java/org/apache/mrql/Streaming.gen
+++ b/core/src/main/java/org/apache/mrql/Streaming.gen
@@ -932,17 +932,21 @@
     }
 
     /** Return the merge function (over X and Y) of the monoid m; type is used for key equality */
-    private static Tree merge ( Tree m, Tree type, Tree X, Tree Y ) {
+    private static Tree merge ( Tree m, Tree type, Tree X, Tree Y, boolean first ) {
         match m {
         case `gb(`n):
             if (! #[groupBy,orderBy].member(#<`gb>) )
                 fail;
             match type {
             case `T(tuple(`keytp,`tp)):
-                if (unique_key(keytp)) {
+                if (first) {
+                    Tree nv = new_var();
+                    Tree mb = merge(n,tp,#<nth(`nv,0)>,#<nth(`nv,1)>,false);
+                    return #<outerMerge(lambda(`nv,`mb),`X,`Y)>;
+                } else if (unique_key(keytp)) {
                     Tree vx = new_var();
                     Tree vy = new_var();
-                    Tree v = merge(n,tp,#<nth(call(elem,`vx),1)>,#<nth(call(elem,`vy),1)>);
+                    Tree v = merge(n,tp,#<nth(call(elem,`vx),1)>,#<nth(call(elem,`vy),1)>,false);
                     if (is_persistent_collection(T)) {
                         X = #<Collect(`X)>;
                         Y = #<Collect(`Y)>;
@@ -961,7 +965,7 @@
                 Tree vy = new_var();
                 Tree mx = new_var();
                 Tree my = new_var();
-                Tree mb = merge(n,tp,#<nth(`vx,1)>,#<nth(`vy,1)>);
+                Tree mb = merge(n,tp,#<nth(`vx,1)>,#<nth(`vy,1)>,false);
                 Tree b = #<cmap(lambda(`vx,cmap(lambda(`vy,bag(tuple(nth(`vx,0),`mb))),
                                                 `my)),
                                 `mx)>;
@@ -980,7 +984,7 @@
             Trees bs = #[ ];
             int i = 0;
             for ( Tree a: ms ) {
-                bs = bs.append(merge(a,tps.nth(i),#<nth(`X,`i)>,#<nth(`Y,`i)>));
+                bs = bs.append(merge(a,tps.nth(i),#<nth(`X,`i)>,#<nth(`Y,`i)>,first));
                 i++;
             };
             return #<tuple(...bs)>;
@@ -992,7 +996,82 @@
                 match b {
                 case bind(`n,`a):
                     cs = cs.append(#<bind(`n,`(merge(a,((Node)tps.nth(i++)).children().nth(1),
-                                                     #<project(`X,`n)>,#<project(`Y,`n)>)))>);
+                                                     #<project(`X,`n)>,#<project(`Y,`n)>,first)))>);
+                }
+            return #<record(...cs)>;
+        case union:
+            return #<call(plus,`X,`Y)>;
+        case fixed:
+            return Y;
+        case _:
+            if (!m.is_variable())
+                fail;
+            for ( Tree monoid: monoids )
+                match monoid {
+                case `aggr(`mtp,`plus,`zero,`unit):
+                    if (#<`aggr>.equals(m))
+                        return #<apply(`plus,tuple(`X,`Y))>;
+                };
+        };
+        throw new Error("Undefined monoid: "+m);
+    }
+
+    /** Return the merge function (over X and Y) of the monoid m; type is used for key equality */
+    private static Tree merge_left ( Tree m, Tree type, Tree X, Tree Y, boolean first ) {
+        match m {
+        case `gb(`n):
+            if (! #[groupBy,orderBy].member(#<`gb>) )
+                fail;
+            match type {
+            case `T(tuple(`keytp,`tp)):
+                if (first) {
+                    Tree nv = new_var();
+                    Tree mb = merge(n,tp,#<nth(`nv,0)>,#<nth(`nv,1)>,false);
+                    return #<rightOuterMerge(lambda(`nv,`mb),`X,`Y)>;
+                } else if (unique_key(keytp)) {
+                    Tree vx = new_var();
+                    Tree v = merge_left(n,tp,#<nth(call(elem,`vx),1)>,#<nth(call(elem,`Y),1)>,false);
+                    if (is_persistent_collection(T))
+                        X = #<Collect(`X)>;
+                    return #<let(`vx,`X,
+                                 if(call(exists,`vx),
+                                    bag(tuple(`keytp,`v)),
+                                    call(elem,`Y)))>;
+                };
+                // needs an outer-join in the reducer function
+                Tree v = new_var();
+                Tree vx = new_var();
+                Tree vy = new_var();
+                Tree mx = new_var();
+                Tree mb = merge_left(n,tp,#<nth(`vx,1)>,#<nth(`vy,1)>,false);
+                Tree b = #<cmap(lambda(`vx,cmap(lambda(`vy,bag(tuple(nth(`vx,0),`mb))),
+                                                nth(`v,1))),
+                                `mx)>;
+                return #<join(lambda(`vx,`(key_equality(keytp,#<nth(`vx,0)>))),
+                              lambda(`vy,`(key_equality(keytp,#<nth(`vy,0)>))),
+                              lambda(`v,let(`mx,nth(`v,0),
+                                            if(call(exists,`mx),`b,nth(`v,1)))),
+                              `X, `Y)>;
+            };
+            throw new Error("Unknown type for merge_left: "+type);
+        case product(...ms):
+            Trees tps = ((Node)type).children();
+            Trees bs = #[ ];
+            int i = 0;
+            for ( Tree a: ms ) {
+                bs = bs.append(merge_left(a,tps.nth(i),#<nth(`X,`i)>,#<nth(`Y,`i)>,first));
+                i++;
+            };
+            return #<tuple(...bs)>;
+        case record(...bs):
+            Trees tps = ((Node)type).children();
+            Trees cs = #[ ];
+            int i = 0;
+            for ( Tree b: bs )
+                match b {
+                case bind(`n,`a):
+                    cs = cs.append(#<bind(`n,`(merge_left(a,((Node)tps.nth(i++)).children().nth(1),
+                                                     #<project(`X,`n)>,#<project(`Y,`n)>,first)))>);
                 }
             return #<record(...cs)>;
         case union:
@@ -1431,7 +1510,7 @@
             if (Config.trace) {
                 System.out.println("Subterm type: "+tp);
                 System.out.println("Merge function over X and Y: ");
-                System.out.println(SimplifyTerm(merge(m,tp,#<X>,#<Y>)).pretty(0));
+                System.out.println(SimplifyTerm(merge(m,tp,#<X>,#<Y>,true)).pretty(0));
             };
             type_env.insert(nv.toString(),tp);
             Tree answer = answer(ne,nv,m);
@@ -1444,8 +1523,8 @@
             };
             Tree zero = zero(m);
             Tree my = new_var();
-            Tree merge = convert_to_algebra(SimplifyTerm(merge(m,tp,nv,my)));
-            Tree res = #<tuple(`zero,lambda(tuple(`nv,`my),`merge),`q,lambda(`nv,`answer))>;
+            Tree merge = convert_to_algebra(SimplifyTerm(merge(m,tp,nv,my,true)));
+            Tree res = #<tuple(`zero,lambda(tuple(`nv,`my),`merge),`q,lambda(`nv,`answer),`m)>;
             if (Config.trace) {
                 System.out.println("Incremental subterm:");
                 System.out.println(res.pretty(0));
@@ -1457,7 +1536,7 @@
 
     private static Tree generate_incremental_code ( Tree e, Environment env ) {
         if (!is_streaming(e))
-            return #<tuple(tuple(),lambda(tuple(tuple(),tuple()),tuple()),tuple(),lambda(tuple(),`e))>;
+            return #<tuple(tuple(),lambda(tuple(tuple(),tuple()),tuple()),tuple(),lambda(tuple(),`e),fixed)>;
         match e {
         case call(stream,...):
             Tree v = new_var();
@@ -1472,50 +1551,53 @@
                 };
             if (is_monoid)
                 fail;
-            Trees zs = #[], vs = #[], ws = #[], ms = #[], hs = #[], as = #[];
+            Trees zs = #[], vs = #[], ws = #[], ms = #[], hs = #[], as = #[], ns = #[];
             int i = 0;
             for ( Tree q: qs )
                 match generate_incremental_code(q,env) {
-                case tuple(`z,lambda(tuple(`v,`w),`m),`h,lambda(_,`a)):
+                case tuple(`z,lambda(tuple(`v,`w),`m),`h,lambda(_,`a),`n):
                     zs = zs.append(z); hs = hs.append(h); as = as.append(a);
                     vs = vs.append(v); ws = ws.append(w); ms = ms.append(m);
-                    i++;
+                    ns = ns.append(n); i++;
                 };
             return #<tuple(tuple(...zs),
                            lambda(tuple(tuple(...vs),tuple(...ws)),tuple(...ms)),
                            tuple(...hs),
-                           lambda(tuple(...vs),call(`f,...as)))>;
+                           lambda(tuple(...vs),call(`f,...as)),
+                           tuple(...ns))>;
         case record(...rs):
-            Trees zs = #[], vs = #[], ws = #[], ms = #[], hs = #[], as = #[];
+            Trees zs = #[], vs = #[], ws = #[], ms = #[], hs = #[], as = #[], ns = #[];
             int i = 0;
             for ( Tree r: rs )
                 match r {
                 case bind(`k,`q):
                     match generate_incremental_code(q,env) {
-                    case tuple(`z,lambda(tuple(`v,`w),`m),`h,lambda(_,`a)):
+                    case tuple(`z,lambda(tuple(`v,`w),`m),`h,lambda(_,`a),`n):
                         zs = zs.append(z); hs = hs.append(h); as = as.append(a);
                         vs = vs.append(v); ws = ws.append(w); ms = ms.append(m);
-                        i++;
+                        ns = ns.append(n); i++;
                     }
                 };
             return #<tuple(tuple(...zs),
                            lambda(tuple(tuple(...vs),tuple(...ws)),tuple(...ms)),
                            tuple(...hs),
-                           lambda(tuple(...vs),record(...as)))>;
+                           lambda(tuple(...vs),record(...as)),
+                           tuple(...ns))>;
         case tuple(...qs):
-            Trees zs = #[], vs = #[], ws = #[], ms = #[], hs = #[], as = #[];
+            Trees zs = #[], vs = #[], ws = #[], ms = #[], hs = #[], as = #[], ns = #[];
             int i = 0;
             for ( Tree q: qs )
                 match generate_incremental_code(q,env) {
-                case tuple(`z,lambda(tuple(`v,`w),`m),`h,lambda(_,`a)):
+                case tuple(`z,lambda(tuple(`v,`w),`m),`h,lambda(_,`a),`n):
                     zs = zs.append(z); hs = hs.append(h); as = as.append(a);
                     vs = vs.append(v); ws = ws.append(w); ms = ms.append(m);
-                    i++;
+                    ns = ns.append(n); i++;
                 };
             return #<tuple(tuple(...zs),
                            lambda(tuple(tuple(...vs),tuple(...ws)),tuple(...ms)),
                            tuple(...hs),
-                           lambda(tuple(...vs),tuple(...as)))>;
+                           lambda(tuple(...vs),tuple(...as)),
+                           tuple(...ns))>;
         };
         return generate_code(e,env);
     }
@@ -1554,7 +1636,50 @@
             TypeInference.type_inference(u);
             repeat_environment = new Environment(v.toString(),#<union>,repeat_environment);
             match generate_incremental_code(u,null) {
-            case tuple(`z,lambda(tuple(`v1,`v2),`m),`h,lambda(`s,`a)):
+            case tuple(`z,lambda(tuple(`v1,`v2),`m),`h,lambda(`s,`a),`monoid):
+                Tree deltaT = new_var();
+                Tree w = new_var();
+                Tree tp = TypeInference.type_inference(s);
+                TypeInference.type_env.insert(deltaT.toString(),tp);
+                match tp {
+                case `T(`etp):
+                    TypeInference.type_env.insert(T.toString(),etp);
+                };
+                Tree m_hat = convert_to_algebra(SimplifyTerm(merge_left(monoid,tp,v1,v2,true)));
+                if (Config.trace) {
+                    System.out.println("Left-merge function over X and Y: ");
+                    System.out.println(SimplifyTerm(merge_left(monoid,tp,#<X>,#<Y>,true)).pretty(0));
+                };
+                Tree nm = subst(v1,s,subst(v2,subst(v,subst(s,deltaT,a),h),m_hat));
+                Tree m0 = subst(v,x,h);//subst(v1,s,subst(v2,subst(v,x,h),m_hat));
+                Tree q = #<repeat(lambda(`deltaT,cmap(lambda(`w,bag(tuple(`w,true))),`nm)),
+                                  `m0,
+                                  `(nn-1))>;
+                Tree res = #<incr(`z,
+                                  lambda(`s,apply(lambda(tuple(`v1,`v2),`m),tuple(`s,`q))),
+                                  lambda(`s,`a))>;
+                res = SimplifyTerm(res);
+                if (Config.trace) {
+                    System.out.println("Incremental program:");
+                    System.out.println(res.pretty(0));
+                };
+                TypeInference.type_inference(res);
+                return res;
+            }
+        case repeat(lambda(`v,`u),`x,`n):
+            if (false && !is_streaming(x))
+                fail;
+            if (!(n instanceof LongLeaf))
+                throw new Error("The repeat must have a constant repetition: "+n);
+            int nn = (int)((LongLeaf)n).value();
+            if (nn < 1)
+                throw new Error("Wrong repeat number: "+n);
+            Tree nv = new_var();
+            u = #<cmap(lambda(`nv,bag(nth(`nv,0))),`u)>;
+            TypeInference.type_inference(u);
+            repeat_environment = new Environment(v.toString(),#<union>,repeat_environment);
+            match generate_incremental_code(u,null) {
+            case tuple(`z,lambda(tuple(`v1,`v2),`m),`h,lambda(`s,`a),_):
                 Tree deltaT = new_var();
                 Tree w = new_var();
                 TypeInference.type_env.insert(deltaT.toString(),TypeInference.type_inference(s));
@@ -1576,8 +1701,8 @@
                 return res;
             }
         };
-        match generate_incremental_code(e,null) {
-        case tuple(`z,lambda(tuple(`v1,`v2),`m),`h,lambda(_,`a)):
+    match generate_incremental_code(e,null) {
+    case tuple(`z,lambda(tuple(`v1,`v2),`m),`h,lambda(_,`a),_):
             Tree tp = TypeInference.type_inference(h);
             bind_pattern2type(v1,tp);
             Tree nm = SimplifyTerm(#<apply(lambda(`v2,`m),`h)>);
diff --git a/core/src/main/java/org/apache/mrql/TopLevel.gen b/core/src/main/java/org/apache/mrql/TopLevel.gen
index 13f4d68..9c5ecfa 100644
--- a/core/src/main/java/org/apache/mrql/TopLevel.gen
+++ b/core/src/main/java/org/apache/mrql/TopLevel.gen
@@ -321,7 +321,7 @@
             Evaluator.evaluator.initialize_query();
             final Tree qt = query_type;
             final String file = s.stringValue();
-            Evaluator.evaluator.streaming(plan,null,new Function(){
+            Evaluator.evaluator.streaming(plan,null,null,new Function(){
                     long i = 0;
                     public MRData eval ( final MRData data ) {
                         try {
@@ -342,7 +342,7 @@
             Evaluator.evaluator.initialize_query();
             final Tree qt = query_type;
             final String file = s.stringValue();
-            Evaluator.evaluator.streaming(plan,null,new Function(){
+            Evaluator.evaluator.streaming(plan,null,null,new Function(){
                     long i = 0;
                     public MRData eval ( final MRData data ) {
                         try {
@@ -365,27 +365,10 @@
             case incr(`zero,lambda(`state,`merge),lambda(_,`answer)):
                 final Tree qt = make_persistent_type(query_type);
                 MRData z = evalE(zero,null);
-                final Environment env = bind_list(state,z,null);
-                final boolean incr_test = false;
-                if (incr_test) {
-                    // for code testing only
-                    long t = System.currentTimeMillis();
-                    Environment nenv = bind_list(state,Streaming.unstreamify(merge),env);
-                    for ( Environment ev = nenv; ev != null; ev = ev.next )
-                        if (ev.value instanceof MR_dataset) {
-                            DataSet ds = ((MR_dataset)ev.value).dataset();
-                            List<MRData> vals = ds.take(Config.max_bag_size_print);
-                            System.out.println(ev.name+" = "+vals.size()+ " "+vals);
-                        };
-                    MRData a = Evaluator.evaluator.evalE(answer,nenv);
-                    System.out.println(print(a,qt));
-                    if (!Config.quiet_execution)
-                        System.out.println("Run time: "+(System.currentTimeMillis()-t)/1000.0+" secs");
-                    return;
-                };
+                final Environment env =bind_list(state,z,null);
                 final Tree ans = answer;
                 merge = Streaming.streamify(merge);
-                Evaluator.evaluator.streaming(#<lambda(`state,`merge)>,env,
+                Evaluator.evaluator.streaming(#<lambda(`state,`merge)>,env,null,
                    new Function() {
                         public MRData eval ( final MRData state ) {
                             MRData a = Evaluator.evaluator.evalE(ans,env);
diff --git a/core/src/main/java/org/apache/mrql/Translator.gen b/core/src/main/java/org/apache/mrql/Translator.gen
index 3be0d9f..2d9d2fe 100644
--- a/core/src/main/java/org/apache/mrql/Translator.gen
+++ b/core/src/main/java/org/apache/mrql/Translator.gen
@@ -178,7 +178,8 @@
     static Trees plans_with_distributed_lambdas
         = #[MapReduce,MapAggregateReduce,MapCombineReduce,FroupByJoin,Aggregate,
             MapReduce2,MapCombineReduce2,MapAggregateReduce2,MapJoin,MapAggregateJoin,
-            CrossProduct,CrossAggregateProduct,cMap,AggregateMap,BSP,GroupByJoin];
+            CrossProduct,CrossAggregateProduct,cMap,AggregateMap,BSP,GroupByJoin,
+            OuterMerge,RightOuterMerge];
 
     static Trees algebraic_operators
         = #[mapReduce,mapReduce2,cmap,join,groupBy,orderBy,aggregate,map,filter,repeat,closure];
diff --git a/core/src/main/java/org/apache/mrql/TypeInference.gen b/core/src/main/java/org/apache/mrql/TypeInference.gen
index 7c80240..f6769f4 100644
--- a/core/src/main/java/org/apache/mrql/TypeInference.gen
+++ b/core/src/main/java/org/apache/mrql/TypeInference.gen
@@ -32,12 +32,11 @@
 
     public static Tree make_persistent_type ( Tree tp ) {
         match tp {
-        case `f(...al):
-            Trees bs = #[];
-            for ( Tree a: al )
-                bs = bs.append(make_persistent_type(a));
-            String g = persistent_collection(f);
-            return #<`g(...bs)>;
+        case `T(`t):
+            if (!is_collection(T))
+                fail;
+            String S = persistent_collection(T);
+            return #<`S(`t)>;
         };
         return tp;
     }
@@ -1200,6 +1199,32 @@
             };
         case cstep(`x):  // used in QueryPlan for closure
             return type_inference(x);
+        case outerMerge(lambda(`v,`b),`s,`d):
+            Tree ts = type_inference(s);
+            if (unify(type_inference(d),ts) == null)
+                type_error(e,"Wrong outerMerge");
+            match ts {
+            case `S(tuple(`k,`tp)):
+                if (!is_collection(S))
+                    fail;
+                type_env.insert(v.toString(),#<tuple(`tp,`tp)>);
+                type_inference(b);
+                return ts;
+            };
+            type_error(e,"Wrong OuterMerge");
+        case rightOuterMerge(lambda(`v,`b),`s,`d):
+            Tree ts = type_inference(s);
+            if (unify(type_inference(d),ts) == null)
+                type_error(e,"Wrong RightOuterMerge");
+            match ts {
+            case `S(tuple(`k,`tp)):
+                if (!is_collection(S))
+                    fail;
+                type_env.insert(v.toString(),#<tuple(`tp,`tp)>);
+                type_inference(b);
+                return ts;
+            };
+            type_error(e,"Wrong rightOuterMerge");
         case if(`p,`x,`y):
             if (unify(type_inference2(p),#<bool>) == null)
                 type_error(e,"Expected a boolean predicate in if-then-else: "+print_query(p));
diff --git a/pom.xml b/pom.xml
index bd9079b..cb1ac62 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,7 +47,7 @@
     <hadoop.version>1.2.1</hadoop.version>
     <yarn.version>2.7.1</yarn.version>
     <hama.version>0.7.0</hama.version>
-    <spark.version>1.6.0</spark.version>
+    <spark.version>1.6.2</spark.version>
     <scala.version>2.10</scala.version>
     <flink.version>1.0.2</flink.version>
     <skipTests>true</skipTests>
diff --git a/queries/factorization3.mrql b/queries/factorization3.mrql
index de61b5d..3aca55b 100644
--- a/queries/factorization3.mrql
+++ b/queries/factorization3.mrql
@@ -44,7 +44,7 @@
 
 // cell-wise addition:
 macro Cadd ( X, Y ) {
-  select ( x*y, i, j )
+  select ( x+y, i, j )
     from (x,i,j) in X, (y,i,j) in Y
 };
 
diff --git a/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen b/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
index 60a9bb3..616da62 100644
--- a/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
+++ b/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
@@ -19,10 +19,7 @@
 
 import org.apache.mrql.gen.*;
 import java_cup.runtime.Scanner;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Hashtable;
-import java.util.Iterator;
+import java.util.*;
 import java.io.*;
 import java.util.Enumeration;
 import org.apache.log4j.*;
@@ -59,11 +56,14 @@
     final static Bag empty_bag = new Bag();
     // an HDFS tmp file used to hold the data source directory information in distributed mode
     final static String data_source_dir_name = "tmp/"+System.getenv("USER")+"_data_source_dir.txt";
-    private static boolean first_time = true; // true at the beginning of a query execution
+    // holds the cached RDDs that need to unpersist
+    private static ArrayList<JavaRDD<MRData>> cached_rdds = new ArrayList<JavaRDD<MRData>>();
+    static Environment global_rdds = null;
 
     /** initialize the Spark evaluator */
     final public void init ( Configuration conf ) {
         Config.spark_mode = true;
+        global_rdds = null;
         SparkConf spark_conf = new SparkConf();
         spark_conf.setAppName("MRQL");
         if (Config.hadoop_mode && Config.local_mode) {
@@ -100,7 +100,6 @@
     }
 
     final public void initialize_query () {
-        first_time = true;
         Plan.distribute_compiled_arguments(Plan.conf);
         if (spark_context != null && Config.compile_functional_arguments)
             spark_context.addJar(Plan.conf.get("mrql.jar.path"));
@@ -125,9 +124,15 @@
         return SparkGeneratorInputFormat.class;
     }
 
+    final public static void GC_cached_rdds () {
+	for ( JavaRDD<MRData> rdd: cached_rdds )
+	    rdd.unpersist();
+	cached_rdds = new ArrayList<JavaRDD<MRData>>();
+    }
+
     /** evaluate plan in stream mode: evaluate each batch of data and apply the function f */
-    final public void streaming ( Tree plan, Environment env, org.apache.mrql.Function f ) {
-        SparkStreaming.evaluate(plan,env,f);
+    final public void streaming ( Tree plan, Environment env, Environment dataset_env, org.apache.mrql.Function f ) {
+        SparkStreaming.evaluate(plan,env,dataset_env,f);
     }
 
     /** used by the master to send parsing details (eg, record types) to workers */
@@ -179,6 +184,7 @@
                         return new Tuple2<MRContainer,MRContainer>(zero,new MRContainer(value));
                     }
                 }).saveAsHadoopFile(file,MRContainer.class,MRContainer.class,SequenceFileOutputFormat.class);
+            rdd.unpersist();
         } else super.dump(file,type,data);
     }
 
@@ -206,23 +212,14 @@
                 rdd.saveAsTextFile(file);
             };
             Config.max_bag_size_print = ps;
+            rdd.unpersist();
         } else super.dump_text(file,type,data);
     }
 
-    private static void set_global_env ( Environment env ) {
-        if (true || first_time) {  // Bug: it doesn't need to be executed more than once per worker
-            // pass the global bindings to workers
-            set_global_bindings(env);
-            first_time = false;
-        }
-    }
-
     private static Function2<MRData,MRData,MRData> accumulator ( final Tree acc_fnc, final Environment env ) {
-        final Environment master_env = global_env;
         final org.apache.mrql.Function f = evalF(acc_fnc,env);
         return new Function2<MRData,MRData,MRData>() {
             public MRData call ( MRData x, MRData y ) {
-                set_global_env(master_env);
                 return f.eval(new Tuple(x,y));
             }
         };
@@ -243,27 +240,27 @@
         MRData z = evalE(zero,env);
         match plan {
         case AggregateMap(`m,`acc,_,`s):
-            return evalD(#<cMap(`m,`s)>,env)
+            return evalD(#<cMap(`m,`s)>,env,null)
                      .aggregate(z,accumulator(acc,env),f2);
         case MapAggregateReduce(`m,`r,`acc,_,`s,`o):
             if (acc.equals(#<null>))
                 fail;
-            return evalD(#<MapReduce(`m,`r,`s,`o)>,env)
+            return evalD(#<MapReduce(`m,`r,`s,`o)>,env,null)
                      .aggregate(z,accumulator(acc,env),f2);
         case CrossAggregateProduct(`mx,`my,`r,`acc,_,`x,`y):
             if (acc.equals(#<null>))
                 fail;
-            return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env)
+            return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env,null)
                      .aggregate(z,accumulator(acc,env),f2);
         case MapAggregateReduce2(`mx,`my,`r,`acc,_,`x,`y,`o):
             if (acc.equals(#<null>))
                 fail;
-            return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env)
+            return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env,null)
                      .aggregate(z,accumulator(acc,env),f2);
         case MapAggregateJoin(`mx,`my,`r,`acc,_,`x,`y):
             if (acc.equals(#<null>))
                 fail;
-            return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env)
+            return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env,null)
                      .aggregate(z,accumulator(acc,env),f2);
         };
         throw new Error("Unrecognized aggregation: "+plan);
@@ -276,13 +273,15 @@
             int limit = ((MR_int)evalE(num,env)).get();
             MR_rdd[] s = new MR_rdd[vs.length()];
             for ( int i = 0; i < vs.length(); i++ )
-                s[i] = new MR_rdd(eval(ss.nth(i),env).cache());
+                s[i] = new MR_rdd(eval(ss.nth(i),env,(Environment)null).cache());
             for ( int n = 0; n < limit; n++ ) {
-                Environment nenv = env;
+                Environment nenv = null;
                 for ( int i = 0; i < vs.length(); i ++ )
                     nenv = new Environment(vs.nth(i).toString(),s[i],nenv);
-                for ( int i = 0; i < vs.length(); i ++ )
-                    s[i] = new MR_rdd(eval(bs.nth(i),nenv).cache());
+                for ( int i = 0; i < vs.length(); i ++ ) {
+                    s[i].rdd().unpersist();
+                    s[i] = new MR_rdd(eval(bs.nth(i),env,nenv).cache());
+                }
             };
             Tuple t = new Tuple(vs.length());
             for ( int i = 0; i < vs.length(); i++ )
@@ -304,57 +303,18 @@
             });
     }
 
-    // Return the RDD elements at the given position
-    private static Iterator<MRData> partition ( final JavaRDD<MRData> rdd, final int position ) {
-        /* Doesn't work (needs the right TaskContext)
-        TaskContext context = new TaskContext(0,0,(long)0,Config.local_mode,null);
-        List<Partition> ps = rdd.splits();
-        return rdd.iterator(ps.get(position),context);
-        */
-        return rdd.mapPartitionsWithIndex(new Function2<Integer,Iterator<MRData>,Iterator<MRData>>() {
-                public Iterator<MRData> call ( Integer partition, Iterator<MRData> values ) {
-                    if (partition == position)
-                        return values;
-                    else return new ArrayList<MRData>().iterator();
-                }
-            },true).collect().iterator();
-    }
-
-    final static int MAX_CACHE_SIZE = 1000;
-
     /** Convert a Spark RDD into a lazy bag
      * @param rdd the Spark RDD
      * @return a lazy bag that contains all RDD elements
      */
     public static Bag bag ( final JavaRDD<MRData> rdd ) throws IOException {
-        final JavaRDD<MRData> rd = rdd.cache();
-        if (rd.count() <= MAX_CACHE_SIZE) {  // small RDD
-            final Iterator<MRData> i = rd.collect().iterator();
-            return new Bag(new BagIterator() {
-                    public MRData next () {
-                        return i.next();
-                    }
-                    public boolean hasNext () {
-                        return i.hasNext();
-                    }
-                });
-        };
-        // return the RDD elements lazily, one partition at a time
-        final int splits = rd.splits().size();
+        final Iterator<MRData> i = rdd.toLocalIterator();
         return new Bag(new BagIterator() {
-                Iterator<MRData> i = null;
-                int c = 0;
                 public MRData next () {
                     return i.next();
                 }
                 public boolean hasNext () {
-                    do {
-                        if (i != null && i.hasNext()) 
-                            return true;
-                        if (c >= splits)
-                            return false;
-                        i = partition(rd,c++);
-                    } while (true);
+                    return i.hasNext();
                 }
             });
     }
@@ -370,15 +330,6 @@
         }
     }
 
-    private static JavaRDD<MRData> materialize ( JavaRDD<MRData> rdd ) {
-        return rdd.map(new Function<MRData,MRData>() {
-                public MRData call ( MRData value ) {
-                    value.materializeAll();
-                    return value;
-                };
-            }).cache();
-    }
-
     private final static Function<MRData,MRData> get_first
         = new Function<MRData,MRData>() {
                    public MRData call ( MRData value ) {
@@ -395,9 +346,10 @@
     final public DataSet eval ( final Tree e,
                                 final Environment env,
                                 final String counter ) {
-        JavaRDD<MRData> rd = eval(e,env);
+        JavaRDD<MRData> rd = eval(e,env,(Environment)null);
         long count = 0;
         rd = rd.cache();
+	cached_rdds.add(rd);
         if (!counter.equals("-")) {
             final Accumulator<Integer> c = spark_context.intAccumulator(0);
             rd.foreach(new VoidFunction<MRData>() {
@@ -415,14 +367,15 @@
     /** Evaluate an MRQL physical plan using Spark and print tracing info
      * @param e the physical plan
      * @param env contains bindings from variables to values (MRData)
+     * @param rdd_env contains bindings from variables to RDDs
      * @return a Spark RDD
      */
-    final public JavaRDD<MRData> eval ( final Tree e, final Environment env ) {
+    final public JavaRDD<MRData> eval ( final Tree e, final Environment env, final Environment rdd_env ) {
         if (Config.trace_execution) {
             tab_count += 3;
             System.out.println(tabs(tab_count)+print_query(e));
         };
-        final JavaRDD<MRData> res = evalD(e,env);
+        final JavaRDD<MRData> res = evalD(e,env,rdd_env);
         if (Config.trace_execution) 
             try {
                 System.out.println(tabs(tab_count)+"-> "+res.take(Config.max_bag_size_print));
@@ -435,11 +388,11 @@
 
     /* convert an MRQL lambda to a Spark Function */
     private static FlatMapFunction<MRData,MRData> cmap_fnc ( Tree fnc, Environment env ) {
-        final Environment master_env = global_env;
+        final Broadcast<Environment> broadcast_env = spark_context.broadcast(global_env);
         final org.apache.mrql.Function f = evalF(fnc,env);
         return new FlatMapFunction<MRData,MRData>() {
             public Iterable<MRData> call ( MRData value ) {
-                set_global_env(master_env);
+                global_env = broadcast_env.value();
                 return (Bag)f.eval(value);
             }
         };
@@ -449,17 +402,18 @@
     private static JavaRDD<MRData> groupBy ( JavaRDD<MRData> s, Tree fnc, Environment env, Tree o ) {
         match o {
         case true:   // the result must be sorted
-            return s.groupBy(get_first)
-                .sortByKey()
-                .map(new Function<Tuple2<MRData,Iterable<MRData>>,MRData>() {
-                        public MRData call ( Tuple2<MRData,Iterable<MRData>> value ) {
-                            return new Tuple(value._1,bag(value._2));
+            return s.mapToPair(new PairFunction<MRData,MRData,MRData>() {
+                    public Tuple2<MRData,MRData> call ( MRData value ) {
+                        Tuple t = (Tuple)value;
+                        return new Tuple2(t.first(),t.second());
+                    }
+                })
+                .sortByKey(true)
+                .map(new Function<Tuple2<MRData,MRData>,MRData>() {
+                        public MRData call ( Tuple2<MRData,MRData> value ) {
+                            return new Tuple(value._1,new Bag(value._2));
                         }})
-                .flatMap(cmap_fnc(fnc,env)).map(new Function<MRData,MRData>() {
-                        public MRData call ( MRData value ) {
-                            return ((Tuple)value).second();
-                        }
-                    });
+                .flatMap(cmap_fnc(fnc,env));
         };
         return s.groupBy(get_first)
             .map(new Function<Tuple2<MRData,Iterable<MRData>>,MRData> () {
@@ -479,10 +433,10 @@
     }
 
     static JavaRDD<MRData> containerData ( JavaPairRDD<MRContainer,MRContainer> rd ) {
-        final Environment master_env = global_env;
+        final Broadcast<Environment> broadcast_env = spark_context.broadcast(global_env);
         return rd.map(new Function<Tuple2<MRContainer,MRContainer>,MRData>() {
                 public MRData call ( Tuple2<MRContainer,MRContainer> value ) {
-                    set_global_env(master_env);
+                    global_env = broadcast_env.value();
                     return value._2.data();
                 }
             });
@@ -506,10 +460,8 @@
     }
 
     private static FlatMapFunction<Iterator<MRData>,MRData> combiner_fnc ( final org.apache.mrql.Function f ) {
-        final Environment master_env = global_env;
         return new FlatMapFunction<Iterator<MRData>,MRData>() {
                   public Iterable<MRData> call ( final Iterator<MRData> i ) {
-                      set_global_env(master_env);
                       return MapReduceAlgebra.cmap(new org.apache.mrql.Function() {
                               public MRData eval ( MRData x ) {
                                   final MRData key = ((Tuple)x).first();
@@ -535,77 +487,93 @@
         };
     }
 
-    private static Hashtable<MRData,Bag> make_built_table ( List<Tuple2<MRData,MRData>> values ) {
-        Hashtable<MRData,Bag> built_table = new Hashtable<MRData,Bag>(Config.map_cache_size);
-        for ( Tuple2<MRData,MRData> t: values ) {
-            Bag entries = built_table.get(t._1);
-            built_table.put(t._1,
-                            (entries == null)
-                            ? (new Bag(t._2))
-                            : entries.add_element(t._2));
-        };
-        return built_table;
-    }
-
     /** Evaluate MRQL physical operators using Spark
      * @param e the physical plan
-     * @param env contains bindings fro variables to values (MRData)
+     * @param env contains bindings from variables to values (MRData)
+     * @param rdd_env contains bindings from variables to RDDs
      * @return a Spark RDD
      */
-    final public JavaRDD<MRData> evalD ( final Tree e, final Environment env ) {
-        final Environment master_env = global_env;
+    final public JavaRDD<MRData> evalD ( final Tree e, final Environment env, final Environment rdd_env ) {
         try {
             match e {
             case MapAggregateReduce(`m,`r,null,_,`s,`o):
-                return evalD(#<MapReduce(`m,`r,`s,`o)>,env);
+                return evalD(#<MapReduce(`m,`r,`s,`o)>,env,rdd_env);
             case CrossAggregateProduct(`mx,`my,`r,null,_,`x,`y):
-                return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env);
+                return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env,rdd_env);
             case MapAggregateReduce2(`mx,`my,`r,null,_,`x,`y,`o):
-                return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env);
+                return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env,rdd_env);
             case MapAggregateJoin(`mx,`my,`r,null,_,`x,`y):
-                return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env);
+                return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env,rdd_env);
             case cMap(`f,`s):
-                return eval(s,env).flatMap(cmap_fnc(f,env));
+                return eval(s,env,rdd_env).flatMap(cmap_fnc(f,env));
             case MapReduce(`m,`r,`s,`o):
-                return groupBy(eval(s,env).flatMap(cmap_fnc(m,env)),r,env,o);
+                return groupBy(eval(s,env,rdd_env).flatMap(cmap_fnc(m,env)),r,env,o);
             case MapCombineReduce(`m,`c,`r,`s,`o):
-                return groupBy(eval(s,env).flatMap(cmap_fnc(m,env))
+                return groupBy(eval(s,env,rdd_env).flatMap(cmap_fnc(m,env))
                                .mapPartitions(combiner_fnc(evalF(c,env))),r,env,o);
             case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,`o):
-                return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env).mapPartitions(combiner_fnc(evalF(c,env)));
+                return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env,rdd_env).mapPartitions(combiner_fnc(evalF(c,env)));
             case CrossProduct(`mx,`my,`r,`x,`y):
                 final org.apache.mrql.Function fr = evalF(r,env);
-                return eval(x,env)
+                return eval(x,env,rdd_env)
                     .flatMap(cmap_fnc(mx,env))
-                    .cartesian(eval(y,env).flatMap(cmap_fnc(my,env)))
+                    .cartesian(eval(y,env,rdd_env).flatMap(cmap_fnc(my,env)))
                     .flatMap(new FlatMapFunction<Tuple2<MRData,MRData>,MRData>() {
                             public Iterable<MRData> call ( Tuple2<MRData,MRData> value ) {
-                                set_global_env(master_env);
                                 return (Bag)fr.eval(new Tuple(value._1,value._2));
                             }
                         });
             case MapReduce2(`mx,`my,`r,`x,`y,`o):
-                final org.apache.mrql.Function fx = evalF(mx,env);
-                final org.apache.mrql.Function fy = evalF(my,env);
                 final org.apache.mrql.Function fr = evalF(r,env);
+                boolean sfx = false;
+                match mx {
+                case compiled(`fmx,lambda(`vx,bag(`mex))):
+                    sfx = true;
+                    mx = #<lambda(`vx,`mex)>;
+                case lambda(`vx,bag(`mex)):
+                    sfx = true;
+                    mx = #<lambda(`vx,`mex)>;
+                };
+                final org.apache.mrql.Function fx = evalF(mx,env);
                 JavaPairRDD<MRData,MRData> xs
-                    = eval(x,env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
+                    = (sfx)
+                    ? eval(x,env,rdd_env).mapToPair(new PairFunction<MRData,MRData,MRData>() {
+                            public Tuple2<MRData,MRData> call ( MRData value ) {
+                                Tuple t = (Tuple)fx.eval(value);
+                                return new Tuple2(t.first(),t.second());
+                            }
+                        })
+                    : eval(x,env,rdd_env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
                             public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
-                                set_global_env(master_env);
                                 return joinIterator(((Bag)fx.eval(value)).iterator());
                             }
                         });
+                boolean sfy = false;
+                match my {
+                case compiled(`fmy,lambda(`vy,bag(`mey))):
+                    sfy = true;
+                    my = #<lambda(`vy,`mey)>;
+                case lambda(`vy,bag(`mey)):
+                    sfy = true;
+                    my = #<lambda(`vy,`mey)>;
+                };
+                final org.apache.mrql.Function fy = evalF(my,env);
                 JavaPairRDD<MRData,MRData> ys
-                    = eval(y,env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
+                    = (sfy)
+                    ? eval(y,env,rdd_env).mapToPair(new PairFunction<MRData,MRData,MRData>() {
+                            public Tuple2<MRData,MRData> call ( MRData value ) {
+                                Tuple t = (Tuple)fy.eval(value);
+                                return new Tuple2(t.first(),t.second());
+                            }
+                        })
+                    : eval(y,env,rdd_env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
                             public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
-                                set_global_env(master_env);
                                 return joinIterator(((Bag)fy.eval(value)).iterator());
                             }
                         });
 		return xs.cogroup(ys)
                     .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() {
                             public Iterable<MRData> call ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) {
-                                set_global_env(master_env);
                                 return (Bag)fr.eval(new Tuple(bag(value._2._1),bag(value._2._2)));
                             }
                         });
@@ -623,9 +591,8 @@
                 final MRData one = new MR_byte(1);
                 final MRData two = new MR_byte(2);
                 final JavaPairRDD<MRData,MRData> xs
-                    = eval(x,env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
+                    = eval(x,env,rdd_env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
                             public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
-                                set_global_env(master_env);
                                 return new Iterable<Tuple2<MRData,MRData>>() {
                                     public Iterator<Tuple2<MRData,MRData>> iterator() {
                                         return new Iterator<Tuple2<MRData,MRData>>() {
@@ -645,9 +612,8 @@
                             }
                         });
                 final JavaPairRDD<MRData,MRData> ys
-                    = eval(y,env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
+                    = eval(y,env,rdd_env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
                             public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
-                                set_global_env(master_env);
                                 return new Iterable<Tuple2<MRData,MRData>>() {
                                     public Iterator<Tuple2<MRData,MRData>> iterator() {
                                         return new Iterator<Tuple2<MRData,MRData>>() {
@@ -666,7 +632,7 @@
                                 };
                             }
                         });
-                return xs.union(ys).groupByKey(Config.nodes)
+                return xs.union(ys).groupByKey()
                     .mapPartitions(new FlatMapFunction<Iterator<Tuple2<MRData,Iterable<MRData>>>,MRData>() {
                         public Iterable<MRData> call ( final Iterator<Tuple2<MRData,Iterable<MRData>>> value ) {
                             Bag xb = new Bag();
@@ -688,22 +654,116 @@
                                 }
                             };
                         }
+                        });                
+            case OuterMerge(`merge,`state,`data):
+                final org.apache.mrql.Function fm = evalF(merge,env);
+                JavaPairRDD<MRData,MRData> S = eval(state,env,rdd_env)
+                    .mapPartitionsToPair(new PairFlatMapFunction<Iterator<MRData>,MRData,MRData>() {
+                            public Iterable<Tuple2<MRData,MRData>> call ( final Iterator<MRData> i ) {
+                                return joinIterator(i);
+                            }
+                        },true);   // do not repartition the state S
+                JavaPairRDD<MRData,MRData> ds = eval(data,env,rdd_env)
+                    .mapToPair(new PairFunction<MRData,MRData,MRData>() {
+                            public Tuple2<MRData,MRData> call ( MRData value ) {
+                                Tuple t = (Tuple)value;
+                                return new Tuple2(t.first(),t.second());
+                            }
                         });
+                JavaRDD<MRData> res = S.cogroup(ds)
+                    .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() {
+                            public Iterable<MRData> call ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) {
+                                final Iterator<MRData> ix = value._2._1.iterator();
+                                final Iterator<MRData> iy = value._2._2.iterator();
+                                ArrayList<MRData> a = new ArrayList<MRData>();
+                                if (ix.hasNext())
+                                    if (iy.hasNext())
+                                        a.add(new Tuple(value._1,fm.eval(new Tuple(ix.next(),iy.next()))));
+                                    else a.add(new Tuple(value._1,ix.next()));
+                                else if (iy.hasNext())
+                                    a.add(new Tuple(value._1,iy.next()));
+                                return a;
+                            }
+                        }).cache();
+                cached_rdds.add(res);
+                return res;
+            case RightOuterMerge(`merge,`state,`data):
+                final org.apache.mrql.Function fm = evalF(merge,env);
+                JavaPairRDD<MRData,MRData> S = eval(state,env,rdd_env)
+                    .mapPartitionsToPair(new PairFlatMapFunction<Iterator<MRData>,MRData,MRData>() {
+                            public Iterable<Tuple2<MRData,MRData>> call ( final Iterator<MRData> i ) {
+                                return joinIterator(i);
+                            }
+                        },true);   // do not repartition the state S
+                JavaPairRDD<MRData,MRData> ds = eval(data,env,rdd_env)
+                    .mapToPair(new PairFunction<MRData,MRData,MRData>() {
+                            public Tuple2<MRData,MRData> call ( MRData value ) {
+                                Tuple t = (Tuple)value;
+                                return new Tuple2(t.first(),t.second());
+                            }
+                        });
+                JavaRDD<MRData> res = S.cogroup(ds)
+                    .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() {
+                            public Iterable<MRData> call ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) {
+                                final Iterator<MRData> ix = value._2._1.iterator();
+                                final Iterator<MRData> iy = value._2._2.iterator();
+                                return (ix.hasNext())
+                                    ? ((iy.hasNext())
+                                       ? new Bag(new Tuple(value._1,fm.eval(new Tuple(ix.next(),iy.next()))))
+                                       : new Bag())
+                                    : ((iy.hasNext())
+                                       ? new Bag(new Tuple(value._1,iy.next()))
+                                       : new Bag());
+                            }
+                        }).cache();
+                cached_rdds.add(res);
+                return res;
+            case MapJoin(lambda(`vx,bag(`mx)),lambda(`vy,bag(`my)),`r,`x,`y,`o):
+                final org.apache.mrql.Function fx = evalF(#<lambda(`vx,`mx)>,env);
+                final org.apache.mrql.Function fy = evalF(#<lambda(`vy,`my)>,env);
+                final org.apache.mrql.Function fr = evalF(r,env);
+                final Broadcast<Map<MRData,MRData>> ys
+                    = spark_context.broadcast(eval(y,env,rdd_env)
+                         .mapToPair(new PairFunction<MRData,MRData,MRData>() {
+                            public Tuple2<MRData,MRData> call ( MRData value ) {
+                                Tuple t = (Tuple)fy.eval(value);
+                                return new Tuple2(t.first(),t.second());
+                            }
+                        }).collectAsMap());
+                return eval(x,env,rdd_env).mapPartitions(new FlatMapFunction<Iterator<MRData>,MRData>() {
+                        public Iterable<MRData> call ( final Iterator<MRData> i ) {
+                            final Map<MRData,MRData> m = ys.value();
+                            return new Iterable<MRData>() {
+                                public Iterator<MRData> iterator() {
+                                    return new Iterator<MRData> () {
+                                        public MRData next () {
+                                            final Tuple p = (Tuple)fx.eval(i.next());
+                                            final MRData pd = m.get(p.first());
+                                            return ((Bag)fr.eval(new Tuple(p.second(),(pd == null) ? empty_bag : pd)));
+                                        }
+                                        public boolean hasNext () {
+                                            return i.hasNext();
+                                        }
+                                        public void remove () {}
+                                    };
+                                }
+                            };
+                        }
+                    },true);
             case MapJoin(`mx,`my,`r,`x,`y):
                 final org.apache.mrql.Function fx = evalF(mx,env);
                 final org.apache.mrql.Function fy = evalF(my,env);
                 final org.apache.mrql.Function fr = evalF(r,env);
-                final Broadcast<List<Tuple2<MRData,MRData>>> ys
-                    = spark_context.broadcast(eval(y,env).flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
-                                public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
-                                    set_global_env(master_env);
-                                    return joinIterator(((Bag)fy.eval(value)).iterator());
-                                }
-                            }).collect());
-                return eval(x,env).flatMap(new FlatMapFunction<MRData,MRData>() {
-                        final Hashtable<MRData,Bag> built_table = make_built_table(ys.value());
+                final Broadcast<Map<MRData,MRData>> ys
+                    = spark_context.broadcast(eval(y,env,rdd_env)
+                         .flatMapToPair(new PairFlatMapFunction<MRData,MRData,MRData>() {
+                            public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
+                                return joinIterator(((Bag)fy.eval(value)).iterator());
+                            }
+                        }).collectAsMap());
+                return eval(x,env,rdd_env).flatMap(new FlatMapFunction<MRData,MRData>() {
                         public Iterable<MRData> call ( MRData value ) {
-                            set_global_env(master_env);
+                            final Map<MRData,MRData> m = ys.value();
                             final Iterator<MRData> i = ((Bag)fx.eval(value)).iterator();
                             return new Iterable<MRData>() {
                                 public Iterator<MRData> iterator() {
@@ -718,7 +778,7 @@
                                                 return true;
                                             while (i.hasNext()) {
                                                 p = (Tuple)i.next();
-                                                MRData pd = built_table.get(p.first());
+                                                MRData pd = m.get(p.first());
                                                 Bag bb = ((Bag)fr.eval(new Tuple(p.second(),(pd == null) ? empty_bag : pd)));
                                                 ix = bb.iterator();
                                                 if (ix.hasNext())
@@ -749,19 +809,18 @@
                                                               MRContainer.class,MRContainer.class,
                                                               Config.nodes));
             case Merge(`x,`y):
-                return eval(x,env).union(eval(y,env));
+                return eval(x,env,rdd_env).union(eval(y,env,rdd_env));
             case Repeat(lambda(`v,`b),`s,`n):
                 int max_num = ((MR_int)evalE(n,env)).get();
                 JavaRDD<MRData> rd;
-                JavaRDD<MRData> res = eval(s,env);
-                //res = materialize(res);
+                JavaRDD<MRData> res = eval(s,env,rdd_env).cache();
                 int i = 0;
                 boolean cont = true;
                 do {
-                    rd = eval(b,new Environment(v.toString(),new MR_rdd(res),env)).cache();
-                    res = rd.map(get_first);
-                    //res = materialize(res);
-		    long size = res.count();
+                    rd = eval(b,env,new Environment(v.toString(),new MR_rdd(res),rdd_env)).cache();
+                    res.unpersist();
+		    res = rd.map(get_first).cache();
+		    long size = rd.count();
                     Integer true_results
                         = rd.aggregate(new Integer(0),
                                        new Function2<Integer,MRData,Integer>() {
@@ -778,23 +837,28 @@
 			if (size == 0)
 			    System.err.println("Repeat #"+i);
 			else System.err.println("Repeat #"+i+": "+true_results+" true results out of "+size);
+                    rd.unpersist();
                 } while (cont);
+		cached_rdds.add(res);
                 return res;
             case Closure(lambda(`v,`b),`s,`m):
                 int max_num = ((MR_int)evalE(m,env)).get();
-                JavaRDD<MRData> res = eval(s,env).cache();
+                JavaRDD<MRData> res = eval(s,env,rdd_env).cache();
                 long n = 0;
                 long old = 0;
                 int i = 0;
                 boolean cont = true;
                 do {
-                    res = eval(b,new Environment(v.toString(),new MR_rdd(res),env)).cache();
+                    JavaRDD<MRData> rd = eval(b,env,new Environment(v.toString(),new MR_rdd(res),rdd_env)).cache();
+		    res.unpersist();
+		    res = rd;
                     old = n;
                     n = res.count();
                     i++;
                     if (!Config.testing)
                         System.err.println("Repeat #"+i+": "+(old-n)+" new records");
                 } while (old < n && i < max_num);
+		cached_rdds.add(res);
                 return res;
             case Generator(`min,`max,`size):
                 DataSet ds = Plan.generator(((MR_long)evalE(min,env)).get(),
@@ -815,20 +879,23 @@
                 else if (x instanceof Bag)
                     ((Bag)x).materialize();
                 new_distributed_binding(v.toString(),x);
-                return eval(body,new Environment(v.toString(),x,env));
+                return eval(body,new Environment(v.toString(),x,env),rdd_env);
             case let(`v,`u,`body):
                 MRData val = evalE(u,env);
                 Interpreter.new_global_binding(v.toString(),val);
-                return eval(body,new Environment(v.toString(),val,env));
+                return eval(body,new Environment(v.toString(),val,env),rdd_env);
             case Let(`v,`u,`body):
-                return eval(body,new Environment(v.toString(),new MR_rdd(eval(u,env).cache()),env));
+		JavaRDD<MRData> val = eval(u,env,rdd_env).cache();
+                JavaRDD<MRData> rd = eval(body,env,new Environment(v.toString(),new MR_rdd(val),rdd_env));
+		cached_rdds.add(val);
+                return rd;
             case If(`c,`x,`y):
                 if (((MR_bool)evalE(c,env)).get())
-                    return eval(x,env);
-                else return eval(y,env);
+                    return eval(x,env,rdd_env);
+                else return eval(y,env,rdd_env);
             case trace(`msg,`tp,`x):
                 long n = pre_trace(((MR_string)evalE(msg,env)).get());
-                JavaRDD<MRData> ds = evalD(x,env);
+                JavaRDD<MRData> ds = evalD(x,env,rdd_env);
                 trace(n,tp,new Bag(ds.take(Config.max_bag_size_print)));
                 return ds;
             case apply(`f,`arg):
@@ -839,7 +906,13 @@
             case `v:
                 if (!v.is_variable())
                     fail;
-                MRData x = variable_lookup(v.toString(),env);
+                MRData x = variable_lookup(v.toString(),rdd_env);
+                if (x != null && x instanceof MR_rdd)
+                    return ((MR_rdd)x).rdd();
+                x = variable_lookup(v.toString(),global_rdds);
+                if (x != null && x instanceof MR_rdd)
+                    return ((MR_rdd)x).rdd();
+                x = variable_lookup(v.toString(),global_env);
                 if (x != null)
                     if (x instanceof MR_dataset)
                         return ((RDDDataSource)((MR_dataset)x).dataset().source.get(0)).rdd;
@@ -855,12 +928,10 @@
                         l.add(x);
                         return spark_context.parallelize(l);
                     };
-                x = variable_lookup(v.toString(),global_env);
+                x = variable_lookup(v.toString(),env);
                 if (x != null)
                     if (x instanceof MR_dataset)
                         return ((RDDDataSource)((MR_dataset)x).dataset().source.get(0)).rdd;
-                    else if (x instanceof MR_rdd)
-                        return ((MR_rdd)x).rdd();
                     else if (x instanceof Bag) {
                         ArrayList<MRData> l = new ArrayList<MRData>();
                         for ( MRData a: (Bag)x )
diff --git a/spark/src/main/java/org/apache/mrql/SparkStreaming.gen b/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
index d0ec379..1ae28b1 100644
--- a/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
+++ b/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
@@ -29,6 +29,7 @@
 import scala.collection.Seq;
 import scala.collection.JavaConversions;
 import scala.runtime.AbstractFunction2;
+import org.apache.spark.storage.RDDInfo;
 
 
 /** Evaluates physical plans in Apache Spark stream mode */
@@ -74,28 +75,29 @@
         return plan;
     }
 
-    private final static Evaluator ef = Evaluator.evaluator;
+    private final static SparkEvaluator ef = (SparkEvaluator)Evaluator.evaluator;
 
     /** bind the pattern variables to values */
-    private static Environment bind_list ( Tree pattern, Tree src, Environment env ) {
+    private static void bind_list ( Tree pattern, Tree src, Environment env, Environment rdd_env ) {
         match pattern {
         case tuple(...ps):
             int i = 0;
             match src {
             case tuple(...es):
                 for ( Tree p: ps )
-                    env = bind_list(p,es.nth(i++),env);
-                return env;
+                    bind_list(p,es.nth(i++),env,rdd_env);
             }
         };
-        MRData value = ef.evalE(src,env);
-        env.replace(pattern.toString(),value);
+        MRData value = new MR_rdd(ef.eval(src,env,rdd_env));
+        if (variable_lookup(pattern.toString(),global_rdds) == null)
+            global_rdds = new Environment(pattern.toString(),value,global_rdds);
+        else global_rdds.replace(pattern.toString(),value);
         if (repeat_variables.member(pattern))
             Interpreter.new_distributed_binding(pattern.toString(),value);
-        return env;
     }
 
-    private static void stream_processing ( final Tree plan, final Environment env, final Function f ) {
+    private static void stream_processing ( final Tree plan, final Environment env,
+                                            final Environment dataset_env, final Function f ) {
         if (streams.size() == 0)
             throw new Error("No input streams in query");
         ArrayList<DStream<?>> rdds = new ArrayList<DStream<?>>();
@@ -106,7 +108,7 @@
             new AbstractFunction2<Seq<RDD<?>>,Time,RDD<MRData>>() {
             public RDD<MRData> apply ( Seq<RDD<?>> rdds, Time tm ) {
                 long t = System.currentTimeMillis();
-                Environment new_env = env;
+                Environment rdd_env = dataset_env;
                 int i = 0;
                 for ( RDD<?> rdd: JavaConversions.seqAsJavaList(rdds) ) {
                     try {
@@ -115,17 +117,25 @@
                     } catch (Exception ex) {
                         return SparkEvaluator.spark_context.sc().emptyRDD(classtag);
                     };
-                    new_env = new Environment(vars.get(i++),new MR_rdd(new JavaRDD(rdd,classtag)),new_env);
+                    MRData d = new MR_rdd(new JavaRDD(rdd,classtag));
+                    global_rdds = new Environment(vars.get(i),d,global_rdds);
+                    rdd_env = new Environment(vars.get(i++),d,rdd_env);
                 };
                 match plan {
                 case lambda(`pat,`e):
-                    new_env = bind_list(pat,e,new_env);
+                    bind_list(pat,e,env,rdd_env);
                     f.eval(null);
                 case _: // non-incremental streaming
-                    f.eval(ef.evalE(plan,new_env));
+                    f.eval(ef.evalE(plan,env));
                 };
                 if (!Config.quiet_execution)
                     System.out.println("Run time: "+(System.currentTimeMillis()-t)/1000.0+" secs");
+                for ( RDD<?> rdd: JavaConversions.seqAsJavaList(rdds) )
+                    rdd.unpersist(true);
+		GC_cached_rdds();
+		System.gc();
+		for ( RDDInfo rdi: SparkEvaluator.spark_context.sc().getRDDStorageInfo() )
+                    System.err.println("Warning: cached RDD "+rdi);
 		if (++tries >= Config.stream_tries) {  // for performance experiments only
 		    tries = 0;
 		    SparkEvaluator.stream_context.stop(true,true);
@@ -138,16 +148,16 @@
     }
 
     /** evaluate plan in stream mode: evaluate each batch of data and apply the function f */
-    final public static void evaluate ( Tree plan, Environment env, Function f ) {
+    final public static void evaluate ( Tree plan, Environment env, Environment dataset_env, Function f ) {
         streams = new ArrayList<JavaInputDStream<MRData>>();
         stream_names = new ArrayList<String>();
         match plan {
         case lambda(`p,`b):
             b = get_streams(b,env);
-            stream_processing(#<lambda(`p,`b)>,env,f);
+            stream_processing(#<lambda(`p,`b)>,env,null,f);
         case _:  // non-incremental streaming
             plan = get_streams(plan,env);
-            stream_processing(plan,env,f);
+            stream_processing(plan,env,dataset_env,f);
         };
         stream_context.start();
         stream_context.awaitTermination();