Merge branch 'master' into fsa_codegen2

Conflicts:
	parquet-column/pom.xml
	parquet-column/src/main/java/parquet/example/DummyRecordConverter.java
	parquet-column/src/main/java/parquet/schema/PrimitiveType.java
	parquet-column/src/test/java/parquet/io/PerfTest.java
diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml
index f3cd109..42aee96 100644
--- a/parquet-column/pom.xml
+++ b/parquet-column/pom.xml
@@ -36,6 +36,18 @@
       <scope>compile</scope>
     </dependency>
     <dependency>
+      <groupId>net.ledem</groupId>
+      <artifactId>brennus-builder</artifactId>
+      <version>0.1.0</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>net.ledem</groupId>
+      <artifactId>brennus-asm</artifactId>
+      <version>0.1.0</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>parquet-encoding</artifactId>
       <version>${project.version}</version>
diff --git a/parquet-column/src/main/java/parquet/example/DummyRecordConverter.java b/parquet-column/src/main/java/parquet/example/DummyRecordConverter.java
index 0bf4ea0..82d5ed5 100644
--- a/parquet-column/src/main/java/parquet/example/DummyRecordConverter.java
+++ b/parquet-column/src/main/java/parquet/example/DummyRecordConverter.java
@@ -44,7 +44,6 @@
       @Override
       public Converter convertPrimitiveType(List<GroupType> path, PrimitiveType primitiveType) {
         return new PrimitiveConverter() {
-
           @Override
           public void addBinary(Binary value) {
             a = value;
diff --git a/parquet-column/src/main/java/parquet/io/BaseRecordReader.java b/parquet-column/src/main/java/parquet/io/BaseRecordReader.java
index 58450d8..6fee2ae 100644
--- a/parquet-column/src/main/java/parquet/io/BaseRecordReader.java
+++ b/parquet-column/src/main/java/parquet/io/BaseRecordReader.java
@@ -20,6 +20,8 @@
 import parquet.column.ColumnReadStore;
 import parquet.io.RecordReaderImplementation.State;
 import parquet.io.api.Binary;
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
 import parquet.io.api.RecordConsumer;
 import parquet.io.api.RecordMaterializer;
 
@@ -27,7 +29,6 @@
 public abstract class BaseRecordReader<T> extends RecordReader<T> {
   private static final Log LOG = Log.getLog(BaseRecordReader.class);
 
-  public RecordConsumer recordConsumer;
   public RecordMaterializer<T> recordMaterializer;
   public ColumnReadStore columnStore;
   @Override
@@ -40,10 +41,6 @@
 
   State[] caseLookup;
 
-  private String endField;
-
-  private int endIndex;
-
   protected void currentLevel(int currentLevel) {
     if (DEBUG) LOG.debug("currentLevel: "+currentLevel);
   }
@@ -56,85 +53,6 @@
     return caseLookup[state].getCase(currentLevel, d, nextR).getID();
   }
 
-  final protected void startMessage() {
-    // reset state
-    endField = null;
-    if (DEBUG) LOG.debug("startMessage()");
-    recordConsumer.startMessage();
-  }
-
-  final protected void startGroup(String field, int index) {
-    startField(field, index);
-    if (DEBUG) LOG.debug("startGroup()");
-    recordConsumer.startGroup();
-  }
-
-  private void startField(String field, int index) {
-    if (DEBUG) LOG.debug("startField("+field+","+index+")");
-    if (endField != null && index == endIndex) {
-      // skip the close/open tag
-      endField = null;
-    } else {
-      if (endField != null) {
-        // close the previous field
-        recordConsumer.endField(endField, endIndex);
-        endField = null;
-      }
-      recordConsumer.startField(field, index);
-    }
-  }
-
-  final protected void addPrimitiveINT64(String field, int index, long value) {
-    startField(field, index);
-    if (DEBUG) LOG.debug("addLong("+value+")");
-    recordConsumer.addLong(value);
-    endField(field, index);
-  }
-
-  private void endField(String field, int index) {
-    if (DEBUG) LOG.debug("endField("+field+","+index+")");
-    if (endField != null) {
-      recordConsumer.endField(endField, endIndex);
-    }
-    endField = field;
-    endIndex = index;
-  }
-
-  final protected void addPrimitiveBINARY(String field, int index, Binary value) {
-    startField(field, index);
-    if (DEBUG) LOG.debug("addBinary("+value+")");
-    recordConsumer.addBinary(value);
-    endField(field, index);
-  }
-
-  final protected void addPrimitiveINT32(String field, int index, int value) {
-    startField(field, index);
-    if (DEBUG) LOG.debug("addInteger("+value+")");
-    recordConsumer.addInteger(value);
-    endField(field, index);
-  }
-
-  final protected void endGroup(String field, int index) {
-    if (endField != null) {
-      // close the previous field
-      recordConsumer.endField(endField, endIndex);
-      endField = null;
-    }
-    if (DEBUG) LOG.debug("endGroup()");
-    recordConsumer.endGroup();
-    endField(field, index);
-  }
-
-  final protected void endMessage() {
-    if (endField != null) {
-      // close the previous field
-      recordConsumer.endField(endField, endIndex);
-      endField = null;
-    }
-    if (DEBUG) LOG.debug("endMessage()");
-    recordConsumer.endMessage();
-  }
-
   protected void error(String message) {
     throw new ParquetDecodingException(message);
   }
diff --git a/parquet-column/src/main/java/parquet/io/RecordReaderCompiler.java b/parquet-column/src/main/java/parquet/io/RecordReaderCompiler.java
new file mode 100644
index 0000000..72e3dde
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/io/RecordReaderCompiler.java
@@ -0,0 +1,296 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.io;
+
+import static brennus.model.ExistingType.INT;
+import static brennus.model.ExistingType.VOID;
+import static brennus.model.ExistingType.existing;
+import static brennus.model.Protection.PUBLIC;
+import static parquet.Log.DEBUG;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+
+import parquet.column.ColumnReader;
+import parquet.io.RecordReaderImplementation.Case;
+import parquet.io.RecordReaderImplementation.State;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.PrimitiveConverter;
+
+import brennus.Builder;
+import brennus.ClassBuilder;
+import brennus.ElseBuilder;
+import brennus.Function;
+import brennus.MethodBuilder;
+import brennus.StatementBuilder;
+import brennus.SwitchBuilder;
+import brennus.ThenBuilder;
+import brennus.asm.ASMTypeGenerator;
+import brennus.model.FutureType;
+import brennus.printer.TypePrinter;
+
+public class RecordReaderCompiler {
+
+  private static final String END_RECORD = "endRecord";
+
+  public static class DynamicClassLoader extends ClassLoader {
+    ASMTypeGenerator asmTypeGenerator = new ASMTypeGenerator();
+
+    public Class<?> define(FutureType type) {
+      if (DEBUG) new TypePrinter().print(type);
+      byte[] classBytes = asmTypeGenerator.generate(type);
+      return super.defineClass(type.getName(), classBytes, 0, classBytes.length);
+    }
+  }
+
+  private DynamicClassLoader cl = new DynamicClassLoader();
+  private int id = 0;
+
+  private <S extends StatementBuilder<S>> S generateCase(boolean addPrimitive, State state, Case currentCase, S builder, int stateCount) {
+    if (currentCase.isGoingUp()) {
+      // Generating the following loop:
+      //  for (; currentLevel <= depth; ++currentLevel) {
+      //    currentState.groupConverterPath[currentLevel].start();
+      //  }
+      for (int i = currentCase.getStartLevel(); i <= currentCase.getDepth(); i++) {
+        // Generating the following call:
+        // currentState.groupConverterPath[currentLevel].start();
+        builder = builder.exec().get(groupConverterName(state.id, i)).callNoParam("start").endExec();
+      }
+    }
+    if (addPrimitive) {
+      // Generating the following call:
+      // currentState.primitive.addValueToPrimitiveConverter(currentState.primitiveConverter, columnReader);
+        builder = builder.exec().get(primitiveConverterName(state.id))
+            .call(state.primitive.addMethod())
+              .get("value_"+state.id)
+            .endCall().endExec();
+    }
+    if (currentCase.isGoingDown()) {
+      // Generating the following loop
+      //  for (; currentLevel > next; currentLevel--) {
+      //    currentState.groupConverterPath[currentLevel - 1].end();
+      //  }
+      for (int i = currentCase.getDepth() + 1; i > currentCase.getNextLevel(); i--) {
+        builder = builder.exec().get(groupConverterName(state.id, i - 1)).callNoParam("end").endExec();
+      }
+    }
+    // set currentLevel to its new value
+    if (currentCase.isGoingDown()) {
+      builder = builder
+          .set("currentLevel").literal(currentCase.getNextLevel()).endSet();
+    } else if (currentCase.isGoingUp()) {
+      builder = builder
+          .set("currentLevel").literal(currentCase.getDepth() + 1).endSet();
+    } else {
+      // currentLevel stays the same
+    }
+    int nextReader = currentCase.getNextState();
+    String label = getStateLabel(stateCount, nextReader);
+    builder = builder.gotoLabel(label);
+    return builder;
+  }
+
+  private String getStateLabel(int stateCount, int stateId) {
+    return stateId == stateCount ? END_RECORD : "state_" + stateId;
+  }
+
+  private <S extends StatementBuilder<S>> S generateSwitch(S builder, final boolean defined, final State state, final int stateCount) {
+    final List<Case> cases = defined ? state.getDefinedCases() : state.getUndefinedCases();
+    String columnReader = "state_"+state.id+"_column";
+    if (defined) {
+      // if defined we need to save the current value
+      builder = builder
+          .var(existing(state.primitive.javaType), "value_"+state.id)
+          .set("value_"+state.id).get(columnReader).callNoParam(state.primitive.getMethod()).endSet();
+    }
+    builder = builder
+        .exec().get(columnReader).callNoParam("consume").endExec();
+    if (state.maxRepetitionLevel == 0) {
+      // in that case nextR is always 0
+      builder = builder // TODO: instead change the case lookup code
+          .set("nextR").literal(0).endSet();
+    } else {
+      builder = builder
+          .set("nextR").get(columnReader).callNoParam("getCurrentRepetitionLevel").endSet();
+    }
+    if (cases.size() == 1) {
+      // then no need to switch, directly generate the body of the case
+      final Case currentCase = cases.get(0);
+      return generateCase(defined, state, currentCase, builder, stateCount);
+    } else {
+      // more than one case: generate a switch
+      return builder
+        .switchOn()
+          .callOnThis("getCaseId").literal(state.id).nextParam().get("currentLevel").nextParam().get("d").nextParam().get("nextR").endCall()
+            .switchBlock().transform(
+              new Function<SwitchBuilder<S>, SwitchBuilder<S>>() {
+                public SwitchBuilder<S> apply(SwitchBuilder<S> builder) {
+                  for (Case currentCase : cases) {
+                    if (currentCase.isGoingUp() || defined || currentCase.isGoingDown()) {
+                      builder =
+                          generateCase(defined, state, currentCase, builder.caseBlock(currentCase.getID()), stateCount)
+                          .endCase();
+                    } else {
+                      // if nothing to do, directly jump to the next state
+                      String label = getStateLabel(stateCount, currentCase.getNextState());
+                      builder = builder.gotoLabel(currentCase.getID(), label);
+                    }
+                  }
+                  return builder;
+                }
+              })
+          .defaultCase()
+            // a default case to be safe: this should never happen
+            .exec().callOnThis("error").literal("unknown case").endCall().endExec()
+          .breakCase()
+        .endSwitch();
+    }
+  }
+
+  public <T> RecordReader<T> compile(final RecordReaderImplementation<T> recordReader) {
+    final int stateCount = recordReader.getStateCount();
+    // create a unique class name
+    String className = this.getClass().getName()+"$CompiledRecordReader"+(++id);
+    ClassBuilder classBuilder = new Builder(false)
+        .startClass(className, existing(BaseRecordReader.class))
+        .field(PUBLIC, existing(GroupConverter.class), "recordConsumer"); // converters root
+    for (int i = 0; i < stateCount; i++) {
+      State state = recordReader.getState(i);
+      // TODO: look into using the actual class. It fails when the class is private. maybe setAccessible?
+      // create a field for each column reader
+      classBuilder = classBuilder
+        .field(PUBLIC, existing(ColumnReader.class), "state_"+ i +"_column")
+        .field(PUBLIC, existing(PrimitiveConverter.class), primitiveConverterName(i)); // primitiveConverter
+
+      for (int j = 0; j < state.groupConverterPath.length; j++) {
+        classBuilder = classBuilder
+            .field(PUBLIC, existing(GroupConverter.class), groupConverterName(i, j)); // groupConverters
+      }
+
+    }
+
+    MethodBuilder readMethodBuilder = classBuilder
+        .startMethod(PUBLIC, VOID, "readOneRecord")
+          // declare variables
+          .var(INT, "currentLevel")
+          .var(INT, "d")
+          .var(INT, "nextR")
+          // debug statement
+          .transform(this.<MethodBuilder>debug("startMessage"))
+          //  generating: startMessage();
+          .exec().get("recordConsumer").callNoParam("start").endExec()
+          // initially: currentLevel = 0;
+          .set("currentLevel").literal(0).endSet();
+    for (int i = 0; i < stateCount; i++) {
+      // generate the code for each state of the FSA
+      final State state = recordReader.getState(i);
+      String columnReader = "state_"+i+"_column";
+      readMethodBuilder = readMethodBuilder
+          .label("state_"+i)
+          .transform(this.<MethodBuilder>debug("state "+i));
+
+      if (state.maxDefinitionLevel == 0) {
+        // then it is always defined, we can skip the if
+        readMethodBuilder = generateSwitch(readMethodBuilder, true, state, stateCount);
+      } else {
+        readMethodBuilder = readMethodBuilder
+            // generating:
+            //  int d = columnReader.getCurrentDefinitionLevel();
+            .set("d").get(columnReader).callNoParam("getCurrentDefinitionLevel").endSet()
+            // if it is defined (d == maxDefinitionLevel) then
+            .ifExp().get("d").isEqualTo().literal(state.maxDefinitionLevel).thenBlock()
+              .transform(new Function<ThenBuilder<MethodBuilder>, ThenBuilder<MethodBuilder>>() {
+                public ThenBuilder<MethodBuilder> apply(ThenBuilder<MethodBuilder> builder) {
+                  // generate The switch in the defined case (primitive values will be created)
+                  return generateSwitch(builder, true, state, stateCount);
+                }
+              })
+            .elseBlock() // otherwise:
+              .transform(new Function<ElseBuilder<MethodBuilder>, ElseBuilder<MethodBuilder>>() {
+                public ElseBuilder<MethodBuilder> apply(ElseBuilder<MethodBuilder> builder) {
+                  // generate The switch in the undefined case (primitive values will not be created)
+                  return generateSwitch(builder, false, state, stateCount);
+                }
+              })
+            .endIf();
+      }
+    }
+
+    FutureType testClass = readMethodBuilder
+            .label(END_RECORD)
+            //  endMessage();
+            .exec().get("recordConsumer").callNoParam("end").endExec()
+          .endMethod()
+        .endClass();
+
+    cl.define(testClass);
+    try {
+      Class<?> generated = (Class<?>)cl.loadClass(className);
+      BaseRecordReader<T> compiledRecordReader = (BaseRecordReader<T>)generated.getConstructor().newInstance();
+      try {
+      generated.getField("recordConsumer").set(compiledRecordReader, recordReader.getRecordConsumer());
+      compiledRecordReader.caseLookup = new State[stateCount];
+      for (int i = 0; i < stateCount; i++) {
+        State state = recordReader.getState(i);
+          generated.getField("state_"+i+"_column").set(compiledRecordReader, state.column);
+          generated.getField(primitiveConverterName(i)).set(compiledRecordReader, state.primitiveConverter);
+          for (int j = 0; j < state.groupConverterPath.length; j++) {
+            generated.getField(groupConverterName(i, j)).set(compiledRecordReader, state.groupConverterPath[j]);
+          }
+        compiledRecordReader.caseLookup[i] = state;
+      }
+      } catch (NoSuchFieldException e) {
+        throw new CompilationException("bug: can't find field", e);
+      }
+      compiledRecordReader.recordMaterializer = recordReader.getMaterializer();
+      return compiledRecordReader;
+    } catch (ClassNotFoundException e) {
+      throw new CompilationException("generated class "+className+" could not be loaded", e);
+    } catch (InstantiationException e) {
+      throw new CompilationException("generated class "+className+" could not be instantiated", e);
+    } catch (IllegalAccessException e) {
+      throw new CompilationException("generated class "+className+" is not accessible", e);
+    } catch (IllegalArgumentException e) {
+      throw new CompilationException("generated class "+className+" could not be instantiated", e);
+    } catch (SecurityException e) {
+      throw new CompilationException("generated class "+className+" could not be instantiated", e);
+    } catch (InvocationTargetException e) {
+      throw new CompilationException("generated class "+className+" could not be instantiated", e);
+    } catch (NoSuchMethodException e) {
+      throw new CompilationException("generated class "+className+" could not be instantiated", e);
+    }
+  }
+
+  String groupConverterName(int state, int level) {
+    return "state_" + state + "_groupConverterPath_" + level;
+  }
+
+  String primitiveConverterName(int state) {
+    return "state_"+ state + "_primitive";
+  }
+
+  private <S extends StatementBuilder<S>> Function<S, S> debug(final String message) {
+    return new Function<S, S>() {
+      @Override
+      public S apply(S builder) {
+        if (DEBUG) builder = builder.exec().callOnThis("log").literal(message).endCall().endExec();
+        return builder;
+      }
+
+    };
+  }
+}
diff --git a/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java b/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java
index 4d66278..a4a993b 100644
--- a/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java
+++ b/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java
@@ -444,7 +444,7 @@
     return recordMaterializer;
   }
 
-  protected Converter getRecordConsumer() {
+  protected GroupConverter getRecordConsumer() {
     return recordConsumer;
   }
 
diff --git a/parquet-column/src/main/java/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/parquet/schema/PrimitiveType.java
index 875db02..bf56df7 100644
--- a/parquet-column/src/main/java/parquet/schema/PrimitiveType.java
+++ b/parquet-column/src/main/java/parquet/schema/PrimitiveType.java
@@ -60,7 +60,8 @@
    * @author Julien Le Dem
    */
   public static enum PrimitiveTypeName {
-    INT64("getLong", Long.TYPE) {
+
+    INT64("Long", Long.TYPE) {
       @Override
       public String toString(ColumnReader columnReader) {
         return String.valueOf(columnReader.getLong());
@@ -83,7 +84,7 @@
         return converter.convertINT64(this);
       }
     },
-    INT32("getInteger", Integer.TYPE) {
+    INT32("Integer", Integer.TYPE) {
       @Override
       public String toString(ColumnReader columnReader) {
         return String.valueOf(columnReader.getInteger());
@@ -106,7 +107,7 @@
         return converter.convertINT32(this);
       }
     },
-    BOOLEAN("getBoolean", Boolean.TYPE) {
+    BOOLEAN("Boolean", Boolean.TYPE) {
       @Override
       public String toString(ColumnReader columnReader) {
         return String.valueOf(columnReader.getBoolean());
@@ -129,7 +130,7 @@
         return converter.convertBOOLEAN(this);
       }
     },
-    BINARY("getBinary", Binary.class) {
+    BINARY("Binary", Binary.class) {
       @Override
       public String toString(ColumnReader columnReader) {
         return String.valueOf(columnReader.getBinary());
@@ -152,7 +153,7 @@
         return converter.convertBINARY(this);
       }
     },
-    FLOAT("getFloat", Float.TYPE) {
+    FLOAT("Float", Float.TYPE) {
       @Override
       public String toString(ColumnReader columnReader) {
         return String.valueOf(columnReader.getFloat());
@@ -175,7 +176,7 @@
         return converter.convertFLOAT(this);
       }
     },
-    DOUBLE("getDouble", Double.TYPE) {
+    DOUBLE("Double", Double.TYPE) {
       @Override
       public String toString(ColumnReader columnReader) {
         return String.valueOf(columnReader.getDouble());
@@ -244,14 +245,23 @@
       }
     };
 
-    public final String getMethod;
+    private final String internalName;
     public final Class<?> javaType;
 
-    private PrimitiveTypeName(String getMethod, Class<?> javaType) {
-      this.getMethod = getMethod;
+    private PrimitiveTypeName(String internalName, Class<?> javaType) {
+      this.internalName = internalName;
       this.javaType = javaType;
     }
 
+    public String getMethod() {
+      return "get" + internalName;
+    }
+
+    public String addMethod() {
+      return "add" + internalName;
+    }
+
+
     /**
      * reads the value from the columnReader with the appropriate accessor and returns a String representation
      * @param columnReader
diff --git a/parquet-column/src/test/java/parquet/io/PerfTest.java b/parquet-column/src/test/java/parquet/io/PerfTest.java
index 7372941..7424950 100644
--- a/parquet-column/src/test/java/parquet/io/PerfTest.java
+++ b/parquet-column/src/test/java/parquet/io/PerfTest.java
@@ -35,6 +35,9 @@
 /**
  * make sure {@link Log#LEVEL} is set to {@link Level#OFF}
  *
+ * run with -verbose:gc -Xmx2g -Xms2g
+ * There should be no gc logs in between <<<>>>
+ *
  * @author Julien Le Dem
  *
  */
@@ -47,17 +50,35 @@
   }
 
   private static void read(MemPageStore memPageStore) {
-    read(memPageStore, schema, "read all");
-    read(memPageStore, schema2, "read projected");
-    read(memPageStore, schema3, "read projected no Strings");
+//    readDynamic(memPageStore, schema, "read all");
+    readDynamic(memPageStore, schema, "read all");
+//    readCompiled(memPageStore, schema, "read all");
+    readCompiled(memPageStore, schema, "read all");
+    readDynamic(memPageStore, schema2, "read projected");
+//    readCompiled(memPageStore, schema2, "read projected");
+    readCompiled(memPageStore, schema2, "read projected");
+    readDynamic(memPageStore, schema3, "read projected no Strings");
+//    readCompiled(memPageStore, schema3, "read projected no Strings");
+    readCompiled(memPageStore, schema3, "read projected no Strings");
   }
 
+  static void readDynamic(MemPageStore memPageStore, MessageType schema, String message) {
+    read(memPageStore, schema, message, false);
+  }
+  static void readCompiled(MemPageStore memPageStore, MessageType schema, String message) {
+    read(memPageStore, schema, message, true);
+  }
   private static void read(MemPageStore memPageStore, MessageType myschema,
-      String message) {
+      String message, boolean compiled) {
     MessageColumnIO columnIO = newColumnFactory(myschema);
-    System.out.println(message);
+
+    System.out.println(message + (compiled ? " compiled" : ""));
     RecordMaterializer<Object> recordConsumer = new DummyRecordConverter(myschema);
+
     RecordReader<Object> recordReader = columnIO.getRecordReader(memPageStore, recordConsumer);
+    if (compiled) {
+      recordReader = new RecordReaderCompiler().compile((RecordReaderImplementation<Object>)recordReader);
+    }
 
     read(recordReader, 2, myschema);
     read(recordReader, 10000, myschema);
diff --git a/parquet-column/src/test/java/parquet/io/TestRecordReaderCompiler.java b/parquet-column/src/test/java/parquet/io/TestRecordReaderCompiler.java
new file mode 100644
index 0000000..988004b
--- /dev/null
+++ b/parquet-column/src/test/java/parquet/io/TestRecordReaderCompiler.java
@@ -0,0 +1,79 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.io;
+
+import static parquet.example.Paper.r1;
+import static parquet.example.Paper.schema;
+import static parquet.io.TestColumnIO.expectedEventsForR1;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+
+
+import org.junit.Test;
+
+import parquet.column.ColumnWriteStore;
+import parquet.column.impl.ColumnWriteStoreImpl;
+import parquet.column.page.mem.MemPageStore;
+import parquet.example.data.GroupWriter;
+import parquet.io.ColumnIOFactory;
+import parquet.io.MessageColumnIO;
+import parquet.io.RecordReader;
+import parquet.io.RecordReaderCompiler;
+import parquet.io.RecordReaderImplementation;
+
+public class TestRecordReaderCompiler {
+
+  @Test
+  public void testRecordReaderCompiler() {
+
+    Logger.getLogger("brennus").setLevel(Level.FINEST);
+    Logger.getLogger("brennus").addHandler(new Handler() {
+      public void publish(LogRecord record) {
+        System.err.println(record.getMessage());
+      }
+      public void flush() {
+        System.err.flush();
+      }
+      public void close() throws SecurityException {
+        System.err.flush();
+      }
+    });
+
+    MemPageStore memPageStore = new MemPageStore();
+    ColumnWriteStore writeStore = new ColumnWriteStoreImpl(memPageStore, 1024*1024*1, 1024*1024*1, false);
+    MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
+    new GroupWriter(columnIO.getRecordWriter(writeStore), schema).write(r1);
+    writeStore.flush();
+    System.err.flush();
+    Logger.getLogger("brennus").info("compile");
+    System.out.println("compile");
+    RecordReader<Void> recordReader = columnIO.getRecordReader(
+        memPageStore,
+        new ExpectationValidatingConverter(
+            new ArrayDeque<String>(Arrays.asList(expectedEventsForR1)), schema));
+    recordReader = new RecordReaderCompiler().compile((RecordReaderImplementation<Void>)recordReader);
+
+    Logger.getLogger("brennus").info("read");
+    System.out.println("read");
+    recordReader.read();
+
+  }
+}