[MRQL-98] Improve Data Serialization in Spark Evaluation
diff --git a/core/src/main/java/org/apache/mrql/Bag.java b/core/src/main/java/org/apache/mrql/Bag.java
index 24dd966..f6fbfe7 100644
--- a/core/src/main/java/org/apache/mrql/Bag.java
+++ b/core/src/main/java/org/apache/mrql/Bag.java
@@ -483,6 +483,10 @@
     }
 
     private void writeObject ( ObjectOutputStream out ) throws IOException {
+        writeData(out);
+    }
+
+    public void writeData ( ObjectOutputStream out ) throws IOException {
         materialize();
         WritableUtils.writeVInt(out,size());
         for ( MRData e: this )
@@ -490,18 +494,9 @@
     }
 
     private void readObject ( ObjectInputStream in ) throws IOException, ClassNotFoundException {
-        int n = WritableUtils.readVInt(in);
-        mode = Modes.MATERIALIZED;
-        iterator = null;
-        path = null;
-        writer = null;
-        content = new ArrayList<MRData>(n);
-        for ( int i = 0; i < n; i++ )
-            add(MRContainer.read(in));
+        readFields(in);
     }
 
-    private void readObjectNoData () throws ObjectStreamException { };
-
     /** compare this Bag with a given Bag by comparing their associated elements */
     public int compareTo ( MRData x ) {
         Bag xt = (Bag)x;
diff --git a/core/src/main/java/org/apache/mrql/Inv.java b/core/src/main/java/org/apache/mrql/Inv.java
index 99db099..87bd28c 100644
--- a/core/src/main/java/org/apache/mrql/Inv.java
+++ b/core/src/main/java/org/apache/mrql/Inv.java
@@ -17,9 +17,7 @@
  */
 package org.apache.mrql;
 
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
+import java.io.*;
 
 
 final public class Inv extends MRData {
@@ -43,7 +41,19 @@
     }
 
     public void readFields ( DataInput in ) throws IOException {
-        value.readFields(in);
+        value = MRContainer.read(in);
+    }
+
+    private void writeObject ( ObjectOutputStream out ) throws IOException {
+        writeData(out);
+    }
+
+    public void writeData ( ObjectOutputStream out ) throws IOException {
+        value.write(out);
+    }
+
+    private void readObject ( ObjectInputStream in ) throws IOException, ClassNotFoundException {
+        readFields(in);
     }
 
     public int compareTo ( MRData x ) {
diff --git a/core/src/main/java/org/apache/mrql/Lambda.java b/core/src/main/java/org/apache/mrql/Lambda.java
index d0cfd7a..1b02084 100644
--- a/core/src/main/java/org/apache/mrql/Lambda.java
+++ b/core/src/main/java/org/apache/mrql/Lambda.java
@@ -17,9 +17,7 @@
  */
 package org.apache.mrql;
 
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
+import java.io.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
 
@@ -42,6 +40,12 @@
         throw new Error("Functions are not serializable");
     }
 
+    private void writeObject ( ObjectOutputStream out ) throws IOException {
+        writeData(out);
+    }
+
+    public void writeData ( ObjectOutputStream out ) throws IOException {}
+
     public int compareTo ( MRData x ) {
         throw new Error("Functions cannot be compared");
     }
diff --git a/core/src/main/java/org/apache/mrql/MRContainer.java b/core/src/main/java/org/apache/mrql/MRContainer.java
index 42caa5c..692bddf 100644
--- a/core/src/main/java/org/apache/mrql/MRContainer.java
+++ b/core/src/main/java/org/apache/mrql/MRContainer.java
@@ -67,7 +67,10 @@
     public String toString () { return data.toString(); }
 
     final public static MRData read ( DataInput in ) throws IOException {
-        final byte tag = in.readByte();
+        return readData(in.readByte(),in);
+    }
+
+    final public static MRData readData ( byte tag, DataInput in ) throws IOException {
         switch (tag) {
         case TUPLE: return Tuple.read(in);
         case NULL: return new Tuple(0);
@@ -120,6 +123,10 @@
     }
 
     private void writeObject ( ObjectOutputStream out ) throws IOException {
+        writeData(out);
+    }
+
+    public void writeData ( ObjectOutputStream out ) throws IOException {
         data.write(out);
     }
 
@@ -127,8 +134,6 @@
         data = read(in);
     }
 
-    private void readObjectNoData () throws ObjectStreamException {}
-
     final static class MR_EOLB extends MRData {
         MR_EOLB () {}
 
@@ -140,6 +145,12 @@
 
         public void readFields ( DataInput in ) throws IOException {}
 
+        private void writeObject ( ObjectOutputStream out ) throws IOException {
+        writeData(out);
+    }
+
+    public void writeData ( ObjectOutputStream out ) throws IOException {}
+
         public int compareTo ( MRData x ) { return 0; }
 
         public boolean equals ( Object x ) { return x instanceof MR_EOLB; }
diff --git a/core/src/main/java/org/apache/mrql/MRData.java b/core/src/main/java/org/apache/mrql/MRData.java
index 8c08269..962f1cd 100644
--- a/core/src/main/java/org/apache/mrql/MRData.java
+++ b/core/src/main/java/org/apache/mrql/MRData.java
@@ -24,4 +24,6 @@
 /** All MRQL data are encoded as MRData (similar to AVRO form) */
 public abstract class MRData implements WritableComparable<MRData>, Serializable {
     abstract public void materializeAll ();
+
+    abstract public void writeData ( ObjectOutputStream out ) throws IOException;
 }
diff --git a/core/src/main/java/org/apache/mrql/MR_bool.java b/core/src/main/java/org/apache/mrql/MR_bool.java
index ef3e453..1834b4a 100644
--- a/core/src/main/java/org/apache/mrql/MR_bool.java
+++ b/core/src/main/java/org/apache/mrql/MR_bool.java
@@ -17,9 +17,7 @@
  */
 package org.apache.mrql;
 
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
+import java.io.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
 
@@ -50,6 +48,18 @@
         value = in.readBoolean();
     }
 
+    private void writeObject ( ObjectOutputStream out ) throws IOException {
+        writeData(out);
+    }
+
+    public void writeData ( ObjectOutputStream out ) throws IOException {
+        out.writeBoolean(value);
+    }
+
+    private void readObject ( ObjectInputStream in ) throws IOException, ClassNotFoundException {
+        readFields(in);
+    }
+
     public int compareTo ( MRData x ) {
         assert(x instanceof MR_bool);
         return (value == ((MR_bool) x).value) ? 0 : (value ? 1 : -1);
diff --git a/core/src/main/java/org/apache/mrql/MR_byte.java b/core/src/main/java/org/apache/mrql/MR_byte.java
index 489843d..b16a1fa 100644
--- a/core/src/main/java/org/apache/mrql/MR_byte.java
+++ b/core/src/main/java/org/apache/mrql/MR_byte.java
@@ -17,9 +17,7 @@
  */
 package org.apache.mrql;
 
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
+import java.io.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
 
@@ -53,6 +51,18 @@
         value = in.readByte();
     }
 
+    private void writeObject ( ObjectOutputStream out ) throws IOException {
+        writeData(out);
+    }
+
+    public void writeData ( ObjectOutputStream out ) throws IOException {
+        out.writeByte(value);
+    }
+
+    private void readObject ( ObjectInputStream in ) throws IOException, ClassNotFoundException {
+        readFields(in);
+    }
+
     public int compareTo ( MRData x ) {
         assert(x instanceof MR_byte);
         byte v = ((MR_byte) x).value;
diff --git a/core/src/main/java/org/apache/mrql/MR_char.java b/core/src/main/java/org/apache/mrql/MR_char.java
index 7a70996..c551ece 100644
--- a/core/src/main/java/org/apache/mrql/MR_char.java
+++ b/core/src/main/java/org/apache/mrql/MR_char.java
@@ -17,9 +17,7 @@
  */
 package org.apache.mrql;
 
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
+import java.io.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
 
@@ -51,6 +49,18 @@
         value = in.readChar();
     }
 
+    private void writeObject ( ObjectOutputStream out ) throws IOException {
+        writeData(out);
+    }
+
+    public void writeData ( ObjectOutputStream out ) throws IOException {
+        out.writeChar(value);
+    }
+
+    private void readObject ( ObjectInputStream in ) throws IOException, ClassNotFoundException {
+        readFields(in);
+    }
+
     public int compareTo ( MRData x ) {
         assert(x instanceof MR_char);
         return value-((MR_char) x).value;
diff --git a/core/src/main/java/org/apache/mrql/MR_dataset.java b/core/src/main/java/org/apache/mrql/MR_dataset.java
index 6515b1d..686a236 100644
--- a/core/src/main/java/org/apache/mrql/MR_dataset.java
+++ b/core/src/main/java/org/apache/mrql/MR_dataset.java
@@ -17,9 +17,7 @@
  */
 package org.apache.mrql;
 
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
+import java.io.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
 
@@ -38,6 +36,14 @@
         new Bag(dataset.take(Integer.MAX_VALUE)).write(out);
     }
 
+    private void writeObject ( ObjectOutputStream out ) throws IOException {
+        writeData(out);
+    }
+
+    public void writeData ( ObjectOutputStream out ) throws IOException {
+        throw new Error("DataSets are not serializable");
+    }
+
     public void readFields ( DataInput in ) throws IOException {
         throw new Error("DataSets are not serializable");
     }
diff --git a/core/src/main/java/org/apache/mrql/MR_double.java b/core/src/main/java/org/apache/mrql/MR_double.java
index c0e7d57..5b1bbb1 100644
--- a/core/src/main/java/org/apache/mrql/MR_double.java
+++ b/core/src/main/java/org/apache/mrql/MR_double.java
@@ -17,9 +17,7 @@
  */
 package org.apache.mrql;
 
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
+import java.io.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
 
@@ -57,6 +55,18 @@
         return (value == v) ? 0 : ((value > v) ? 1 : -1);
     }
 
+    private void writeObject ( ObjectOutputStream out ) throws IOException {
+        writeData(out);
+    }
+
+    public void writeData ( ObjectOutputStream out ) throws IOException {
+        out.writeDouble(value);
+    }
+
+    private void readObject ( ObjectInputStream in ) throws IOException, ClassNotFoundException {
+        readFields(in);
+    }
+
     final public static int compare ( byte[] x, int xs, int xl, byte[] y, int ys, int yl, int[] size ) {
         size[0] = (Double.SIZE >> 3)+1;
         double v = WritableComparator.readDouble(x,xs) - WritableComparator.readDouble(y,ys);
diff --git a/core/src/main/java/org/apache/mrql/MR_float.java b/core/src/main/java/org/apache/mrql/MR_float.java
index 121a37c..2261ff0 100644
--- a/core/src/main/java/org/apache/mrql/MR_float.java
+++ b/core/src/main/java/org/apache/mrql/MR_float.java
@@ -17,9 +17,7 @@
  */
 package org.apache.mrql;
 
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
+import java.io.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
 
@@ -53,6 +51,18 @@
         value = in.readFloat();
     }
 
+    private void writeObject ( ObjectOutputStream out ) throws IOException {
+        writeData(out);
+    }
+
+    public void writeData ( ObjectOutputStream out ) throws IOException {
+        out.writeFloat(value);
+    }
+
+    private void readObject ( ObjectInputStream in ) throws IOException, ClassNotFoundException {
+        readFields(in);
+    }
+
     public int compareTo ( MRData x ) {
         assert(x instanceof MR_float);
         float v = ((MR_float) x).value;
diff --git a/core/src/main/java/org/apache/mrql/MR_int.java b/core/src/main/java/org/apache/mrql/MR_int.java
index 37e5d9e..06b1294 100644
--- a/core/src/main/java/org/apache/mrql/MR_int.java
+++ b/core/src/main/java/org/apache/mrql/MR_int.java
@@ -49,6 +49,18 @@
         value = WritableUtils.readVInt(in);
     }
 
+    private void writeObject ( ObjectOutputStream out ) throws IOException {
+        writeData(out);
+    }
+
+    public void writeData ( ObjectOutputStream out ) throws IOException {
+        WritableUtils.writeVInt(out,value);
+    }
+
+    private void readObject ( ObjectInputStream in ) throws IOException, ClassNotFoundException {
+        readFields(in);
+    }
+
     public int compareTo ( MRData x ) {
         assert(x instanceof MR_int);
         return value - ((MR_int) x).value;
diff --git a/core/src/main/java/org/apache/mrql/MR_long.java b/core/src/main/java/org/apache/mrql/MR_long.java
index e16f2e7..1c75604 100644
--- a/core/src/main/java/org/apache/mrql/MR_long.java
+++ b/core/src/main/java/org/apache/mrql/MR_long.java
@@ -17,9 +17,7 @@
  */
 package org.apache.mrql;
 
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
+import java.io.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
 
@@ -51,6 +49,18 @@
         value = WritableUtils.readVLong(in);
     }
 
+    private void writeObject ( ObjectOutputStream out ) throws IOException {
+        writeData(out);
+    }
+
+    public void writeData ( ObjectOutputStream out ) throws IOException {
+        WritableUtils.writeVLong(out,value);
+    }
+
+    private void readObject ( ObjectInputStream in ) throws IOException, ClassNotFoundException {
+        readFields(in);
+    }
+
     public int compareTo ( MRData x ) {
         assert(x instanceof MR_long);
         long v = ((MR_long) x).value;
diff --git a/core/src/main/java/org/apache/mrql/MR_more_bsp_steps.java b/core/src/main/java/org/apache/mrql/MR_more_bsp_steps.java
index 0d844ad..b338fe6 100644
--- a/core/src/main/java/org/apache/mrql/MR_more_bsp_steps.java
+++ b/core/src/main/java/org/apache/mrql/MR_more_bsp_steps.java
@@ -17,9 +17,7 @@
  */
 package org.apache.mrql;
 
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
+import java.io.*;
 
 
 /** used for BSP synchronization when a peer needs to do more steps */
@@ -34,6 +32,12 @@
 
     public void readFields ( DataInput in ) throws IOException {}
 
+    private void writeObject ( ObjectOutputStream out ) throws IOException {
+        writeData(out);
+    }
+
+    public void writeData ( ObjectOutputStream out ) throws IOException {}
+
     public int compareTo ( MRData x ) { return 0; }
 
     public boolean equals ( Object x ) { return x instanceof MR_more_bsp_steps; }
diff --git a/core/src/main/java/org/apache/mrql/MR_short.java b/core/src/main/java/org/apache/mrql/MR_short.java
index b6e34a8..214a41e 100644
--- a/core/src/main/java/org/apache/mrql/MR_short.java
+++ b/core/src/main/java/org/apache/mrql/MR_short.java
@@ -17,9 +17,7 @@
  */
 package org.apache.mrql;
 
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
+import java.io.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
 
@@ -51,6 +49,18 @@
         value = in.readShort();
     }
 
+    private void writeObject ( ObjectOutputStream out ) throws IOException {
+        writeData(out);
+    }
+
+    public void writeData ( ObjectOutputStream out ) throws IOException {
+        out.writeShort(value);
+    }
+
+    private void readObject ( ObjectInputStream in ) throws IOException, ClassNotFoundException {
+        readFields(in);
+    }
+
     public int compareTo ( MRData x ) {
         assert(x instanceof MR_short);
         return value - ((MR_short) x).value;
diff --git a/core/src/main/java/org/apache/mrql/MR_string.java b/core/src/main/java/org/apache/mrql/MR_string.java
index 54cdd59..59af652 100644
--- a/core/src/main/java/org/apache/mrql/MR_string.java
+++ b/core/src/main/java/org/apache/mrql/MR_string.java
@@ -17,9 +17,7 @@
  */
 package org.apache.mrql;
 
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
+import java.io.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
 
@@ -51,6 +49,18 @@
         value = Text.readString(in);
     }
 
+    private void writeObject ( ObjectOutputStream out ) throws IOException {
+        writeData(out);
+    }
+
+    public void writeData ( ObjectOutputStream out ) throws IOException {
+        Text.writeString(out,value);
+    }
+
+    private void readObject ( ObjectInputStream in ) throws IOException, ClassNotFoundException {
+        readFields(in);
+    }
+
     public int compareTo ( MRData x ) {
         assert(x instanceof MR_string);
         return value.compareTo(((MR_string) x).value);
diff --git a/core/src/main/java/org/apache/mrql/MR_sync.java b/core/src/main/java/org/apache/mrql/MR_sync.java
index 275414f..e3ecbdb 100644
--- a/core/src/main/java/org/apache/mrql/MR_sync.java
+++ b/core/src/main/java/org/apache/mrql/MR_sync.java
@@ -17,9 +17,7 @@
  */
 package org.apache.mrql;
 
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
+import java.io.*;
 
 
 /** used for BSP synchronization */
@@ -34,6 +32,12 @@
 
     public void readFields ( DataInput in ) throws IOException {}
 
+    private void writeObject ( ObjectOutputStream out ) throws IOException {
+        writeData(out);
+    }
+
+    public void writeData ( ObjectOutputStream out ) throws IOException {}
+
     public int compareTo ( MRData x ) { return 0; }
 
     public boolean equals ( Object x ) { return x instanceof MR_sync; }
diff --git a/core/src/main/java/org/apache/mrql/MR_variable.java b/core/src/main/java/org/apache/mrql/MR_variable.java
index f81f117..c3a2b2f 100644
--- a/core/src/main/java/org/apache/mrql/MR_variable.java
+++ b/core/src/main/java/org/apache/mrql/MR_variable.java
@@ -17,9 +17,7 @@
  */
 package org.apache.mrql;
 
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
+import java.io.*;
 
 
 /** a template variable; should appear only in a template */
@@ -34,6 +32,12 @@
 
     public void readFields ( DataInput in ) throws IOException {}
 
+    private void writeObject ( ObjectOutputStream out ) throws IOException {
+        writeData(out);
+    }
+
+    public void writeData ( ObjectOutputStream out ) throws IOException {}
+
     public int compareTo ( MRData x ) { return 0; }
 
     public boolean equals ( Object x ) { return false; }
diff --git a/core/src/main/java/org/apache/mrql/Tuple.java b/core/src/main/java/org/apache/mrql/Tuple.java
index 13b0eeb..acfaa11 100644
--- a/core/src/main/java/org/apache/mrql/Tuple.java
+++ b/core/src/main/java/org/apache/mrql/Tuple.java
@@ -97,6 +97,20 @@
         return t;
     }
 
+    private void writeObject ( ObjectOutputStream out ) throws IOException {
+        writeData(out);
+    }
+
+    public void writeData ( ObjectOutputStream out ) throws IOException {
+        WritableUtils.writeVInt(out,tuple.length);
+        for (short i = 0; i < tuple.length; i++)
+            tuple[i].write(out);
+    }
+
+    private void readObject ( ObjectInputStream in ) throws IOException, ClassNotFoundException {
+        readFields(in);
+    }
+
     final public static Tuple read2 ( DataInput in ) throws IOException {
         return new Tuple(MRContainer.read(in),MRContainer.read(in));
     }
@@ -107,7 +121,7 @@
 
     public void readFields ( DataInput in ) throws IOException {
         int n = WritableUtils.readVInt(in);
-        tuple = new Tuple[n];
+        tuple = new MRData[n];
         for ( short i = 0; i < n; i++ )
             tuple[i] = MRContainer.read(in);
     }
diff --git a/core/src/main/java/org/apache/mrql/Union.java b/core/src/main/java/org/apache/mrql/Union.java
index 5d92f32..41e53ce 100644
--- a/core/src/main/java/org/apache/mrql/Union.java
+++ b/core/src/main/java/org/apache/mrql/Union.java
@@ -17,8 +17,7 @@
  */
 package org.apache.mrql;
 
-import java.io.IOException;
-import java.io.DataInput;
+import java.io.*;
 import java.io.DataOutput;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
@@ -57,6 +56,19 @@
         value = MRContainer.read(in);
     }
 
+    private void writeObject ( ObjectOutputStream out ) throws IOException {
+        writeData(out);
+    }
+
+    public void writeData ( ObjectOutputStream out ) throws IOException {
+        out.writeByte(tag);
+        value.write(out);
+    }
+
+    private void readObject ( ObjectInputStream in ) throws IOException, ClassNotFoundException {
+        readFields(in);
+    }
+
     public int compareTo ( MRData x ) {
         assert(x instanceof Union);
         Union p = (Union) x;
diff --git a/flink/src/main/java/org/apache/mrql/FData.java b/flink/src/main/java/org/apache/mrql/FData.java
index f7afc36..fbab256 100644
--- a/flink/src/main/java/org/apache/mrql/FData.java
+++ b/flink/src/main/java/org/apache/mrql/FData.java
@@ -40,7 +40,7 @@
 
     @Override
     public void write ( DataOutputView out ) throws IOException {
-        data.write(out);
+         data.write(out);
     }
 
     @Override
@@ -48,7 +48,7 @@
 
     @Override
     public void copyTo ( FData target ) {
-        target.data = data;
+         target.data = data;
     }
 
     @Override
diff --git a/flink/src/main/java/org/apache/mrql/MR_flink.java b/flink/src/main/java/org/apache/mrql/MR_flink.java
index d191de5..b7baa57 100644
--- a/flink/src/main/java/org/apache/mrql/MR_flink.java
+++ b/flink/src/main/java/org/apache/mrql/MR_flink.java
@@ -17,9 +17,7 @@
  */
 package org.apache.mrql;
 
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
+import java.io.*;
 import org.apache.flink.api.java.DataSet;
 
 
@@ -41,6 +39,14 @@
         throw new Error("DataSets are not serializable");
     }
 
+    private void writeObject ( ObjectOutputStream out ) throws IOException {
+        writeData(out);
+    }
+
+    public void writeData ( ObjectOutputStream out ) throws IOException {
+        throw new Error("DataSets are not serializable");
+    }
+
     public int compareTo ( MRData x ) {
         throw new Error("DataSets cannot be compared");
     }
diff --git a/gen/src/main/java/org/apache/mrql/gen/Node.java b/gen/src/main/java/org/apache/mrql/gen/Node.java
index 8bc55d4..98cd5cd 100644
--- a/gen/src/main/java/org/apache/mrql/gen/Node.java
+++ b/gen/src/main/java/org/apache/mrql/gen/Node.java
@@ -66,7 +66,7 @@
                  + children().tail().head().toString() + ")";
     }
 
-    private void writeObject(ObjectOutputStream out) throws IOException {
+    public void writeData(ObjectOutputStream out) throws IOException {
         out.defaultWriteObject();
     }
 
diff --git a/gen/src/main/java/org/apache/mrql/gen/VariableLeaf.java b/gen/src/main/java/org/apache/mrql/gen/VariableLeaf.java
index c4c0ed9..e073d34 100644
--- a/gen/src/main/java/org/apache/mrql/gen/VariableLeaf.java
+++ b/gen/src/main/java/org/apache/mrql/gen/VariableLeaf.java
@@ -41,7 +41,7 @@
 
     public String pretty ( int position ) { return value; }
 
-    private void writeObject(ObjectOutputStream out) throws IOException {
+    public void writeData(ObjectOutputStream out) throws IOException {
         out.defaultWriteObject();
     }
 
diff --git a/spark/src/main/java/org/apache/mrql/MR_rdd.java b/spark/src/main/java/org/apache/mrql/MR_rdd.java
index 4bf5a3b..8099bb3 100644
--- a/spark/src/main/java/org/apache/mrql/MR_rdd.java
+++ b/spark/src/main/java/org/apache/mrql/MR_rdd.java
@@ -17,9 +17,7 @@
  */
 package org.apache.mrql;
 
-import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
+import java.io.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
 import org.apache.spark.api.java.JavaRDD;
@@ -43,6 +41,14 @@
         throw new Error("RDDs are not serializable");
     }
 
+    private void writeObject ( ObjectOutputStream out ) throws IOException {
+        writeData(out);
+    }
+
+    public void writeData ( ObjectOutputStream out ) throws IOException {
+        throw new Error("RDDs are not serializable");
+    }
+
     public int compareTo ( MRData x ) {
         throw new Error("RDDs cannot be compared");
     }
diff --git a/spark/src/main/java/org/apache/mrql/RDDDataSource.java b/spark/src/main/java/org/apache/mrql/RDDDataSource.java
index b0345da..eeb5520 100644
--- a/spark/src/main/java/org/apache/mrql/RDDDataSource.java
+++ b/spark/src/main/java/org/apache/mrql/RDDDataSource.java
@@ -53,7 +53,6 @@
                     return acc.eval(t);
                 }
             },new Function2<MRData,MRData,MRData>() {
-                Tuple t = new Tuple(new Tuple(),new Tuple());
                 public MRData call ( MRData x, MRData y ) {
                     return (y.equals(zero)) ? x : y;
                 }