Merge branch 'master' into fsa_codegen2
Conflicts:
parquet-column/pom.xml
parquet-column/src/main/java/parquet/example/DummyRecordConverter.java
parquet-column/src/main/java/parquet/schema/PrimitiveType.java
parquet-column/src/test/java/parquet/io/PerfTest.java
diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml
index f3cd109..42aee96 100644
--- a/parquet-column/pom.xml
+++ b/parquet-column/pom.xml
@@ -36,6 +36,18 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>net.ledem</groupId>
+ <artifactId>brennus-builder</artifactId>
+ <version>0.1.0</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>net.ledem</groupId>
+ <artifactId>brennus-asm</artifactId>
+ <version>0.1.0</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-encoding</artifactId>
<version>${project.version}</version>
diff --git a/parquet-column/src/main/java/parquet/example/DummyRecordConverter.java b/parquet-column/src/main/java/parquet/example/DummyRecordConverter.java
index 0bf4ea0..82d5ed5 100644
--- a/parquet-column/src/main/java/parquet/example/DummyRecordConverter.java
+++ b/parquet-column/src/main/java/parquet/example/DummyRecordConverter.java
@@ -44,7 +44,6 @@
@Override
public Converter convertPrimitiveType(List<GroupType> path, PrimitiveType primitiveType) {
return new PrimitiveConverter() {
-
@Override
public void addBinary(Binary value) {
a = value;
diff --git a/parquet-column/src/main/java/parquet/io/BaseRecordReader.java b/parquet-column/src/main/java/parquet/io/BaseRecordReader.java
index 58450d8..6fee2ae 100644
--- a/parquet-column/src/main/java/parquet/io/BaseRecordReader.java
+++ b/parquet-column/src/main/java/parquet/io/BaseRecordReader.java
@@ -20,6 +20,8 @@
import parquet.column.ColumnReadStore;
import parquet.io.RecordReaderImplementation.State;
import parquet.io.api.Binary;
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
import parquet.io.api.RecordConsumer;
import parquet.io.api.RecordMaterializer;
@@ -27,7 +29,6 @@
public abstract class BaseRecordReader<T> extends RecordReader<T> {
private static final Log LOG = Log.getLog(BaseRecordReader.class);
- public RecordConsumer recordConsumer;
public RecordMaterializer<T> recordMaterializer;
public ColumnReadStore columnStore;
@Override
@@ -40,10 +41,6 @@
State[] caseLookup;
- private String endField;
-
- private int endIndex;
-
protected void currentLevel(int currentLevel) {
if (DEBUG) LOG.debug("currentLevel: "+currentLevel);
}
@@ -56,85 +53,6 @@
return caseLookup[state].getCase(currentLevel, d, nextR).getID();
}
- final protected void startMessage() {
- // reset state
- endField = null;
- if (DEBUG) LOG.debug("startMessage()");
- recordConsumer.startMessage();
- }
-
- final protected void startGroup(String field, int index) {
- startField(field, index);
- if (DEBUG) LOG.debug("startGroup()");
- recordConsumer.startGroup();
- }
-
- private void startField(String field, int index) {
- if (DEBUG) LOG.debug("startField("+field+","+index+")");
- if (endField != null && index == endIndex) {
- // skip the close/open tag
- endField = null;
- } else {
- if (endField != null) {
- // close the previous field
- recordConsumer.endField(endField, endIndex);
- endField = null;
- }
- recordConsumer.startField(field, index);
- }
- }
-
- final protected void addPrimitiveINT64(String field, int index, long value) {
- startField(field, index);
- if (DEBUG) LOG.debug("addLong("+value+")");
- recordConsumer.addLong(value);
- endField(field, index);
- }
-
- private void endField(String field, int index) {
- if (DEBUG) LOG.debug("endField("+field+","+index+")");
- if (endField != null) {
- recordConsumer.endField(endField, endIndex);
- }
- endField = field;
- endIndex = index;
- }
-
- final protected void addPrimitiveBINARY(String field, int index, Binary value) {
- startField(field, index);
- if (DEBUG) LOG.debug("addBinary("+value+")");
- recordConsumer.addBinary(value);
- endField(field, index);
- }
-
- final protected void addPrimitiveINT32(String field, int index, int value) {
- startField(field, index);
- if (DEBUG) LOG.debug("addInteger("+value+")");
- recordConsumer.addInteger(value);
- endField(field, index);
- }
-
- final protected void endGroup(String field, int index) {
- if (endField != null) {
- // close the previous field
- recordConsumer.endField(endField, endIndex);
- endField = null;
- }
- if (DEBUG) LOG.debug("endGroup()");
- recordConsumer.endGroup();
- endField(field, index);
- }
-
- final protected void endMessage() {
- if (endField != null) {
- // close the previous field
- recordConsumer.endField(endField, endIndex);
- endField = null;
- }
- if (DEBUG) LOG.debug("endMessage()");
- recordConsumer.endMessage();
- }
-
protected void error(String message) {
throw new ParquetDecodingException(message);
}
diff --git a/parquet-column/src/main/java/parquet/io/RecordReaderCompiler.java b/parquet-column/src/main/java/parquet/io/RecordReaderCompiler.java
new file mode 100644
index 0000000..72e3dde
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/io/RecordReaderCompiler.java
@@ -0,0 +1,296 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.io;
+
+import static brennus.model.ExistingType.INT;
+import static brennus.model.ExistingType.VOID;
+import static brennus.model.ExistingType.existing;
+import static brennus.model.Protection.PUBLIC;
+import static parquet.Log.DEBUG;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+
+import parquet.column.ColumnReader;
+import parquet.io.RecordReaderImplementation.Case;
+import parquet.io.RecordReaderImplementation.State;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.PrimitiveConverter;
+
+import brennus.Builder;
+import brennus.ClassBuilder;
+import brennus.ElseBuilder;
+import brennus.Function;
+import brennus.MethodBuilder;
+import brennus.StatementBuilder;
+import brennus.SwitchBuilder;
+import brennus.ThenBuilder;
+import brennus.asm.ASMTypeGenerator;
+import brennus.model.FutureType;
+import brennus.printer.TypePrinter;
+
+public class RecordReaderCompiler {
+
+ private static final String END_RECORD = "endRecord";
+
+ public static class DynamicClassLoader extends ClassLoader {
+ ASMTypeGenerator asmTypeGenerator = new ASMTypeGenerator();
+
+ public Class<?> define(FutureType type) {
+ if (DEBUG) new TypePrinter().print(type);
+ byte[] classBytes = asmTypeGenerator.generate(type);
+ return super.defineClass(type.getName(), classBytes, 0, classBytes.length);
+ }
+ }
+
+ private DynamicClassLoader cl = new DynamicClassLoader();
+ private int id = 0;
+
+ private <S extends StatementBuilder<S>> S generateCase(boolean addPrimitive, State state, Case currentCase, S builder, int stateCount) {
+ if (currentCase.isGoingUp()) {
+ // Generating the following loop:
+ // for (; currentLevel <= depth; ++currentLevel) {
+ // currentState.groupConverterPath[currentLevel].start();
+ // }
+ for (int i = currentCase.getStartLevel(); i <= currentCase.getDepth(); i++) {
+ // Generating the following call:
+ // currentState.groupConverterPath[currentLevel].start();
+ builder = builder.exec().get(groupConverterName(state.id, i)).callNoParam("start").endExec();
+ }
+ }
+ if (addPrimitive) {
+ // Generating the following call:
+ // currentState.primitive.addValueToPrimitiveConverter(currentState.primitiveConverter, columnReader);
+ builder = builder.exec().get(primitiveConverterName(state.id))
+ .call(state.primitive.addMethod())
+ .get("value_"+state.id)
+ .endCall().endExec();
+ }
+ if (currentCase.isGoingDown()) {
+ // Generating the following loop
+ // for (; currentLevel > next; currentLevel--) {
+ // currentState.groupConverterPath[currentLevel - 1].end();
+ // }
+ for (int i = currentCase.getDepth() + 1; i > currentCase.getNextLevel(); i--) {
+ builder = builder.exec().get(groupConverterName(state.id, i - 1)).callNoParam("end").endExec();
+ }
+ }
+ // set currentLevel to its new value
+ if (currentCase.isGoingDown()) {
+ builder = builder
+ .set("currentLevel").literal(currentCase.getNextLevel()).endSet();
+ } else if (currentCase.isGoingUp()) {
+ builder = builder
+ .set("currentLevel").literal(currentCase.getDepth() + 1).endSet();
+ } else {
+ // currentLevel stays the same
+ }
+ int nextReader = currentCase.getNextState();
+ String label = getStateLabel(stateCount, nextReader);
+ builder = builder.gotoLabel(label);
+ return builder;
+ }
+
+ private String getStateLabel(int stateCount, int stateId) {
+ return stateId == stateCount ? END_RECORD : "state_" + stateId;
+ }
+
+ private <S extends StatementBuilder<S>> S generateSwitch(S builder, final boolean defined, final State state, final int stateCount) {
+ final List<Case> cases = defined ? state.getDefinedCases() : state.getUndefinedCases();
+ String columnReader = "state_"+state.id+"_column";
+ if (defined) {
+ // if defined we need to save the current value
+ builder = builder
+ .var(existing(state.primitive.javaType), "value_"+state.id)
+ .set("value_"+state.id).get(columnReader).callNoParam(state.primitive.getMethod()).endSet();
+ }
+ builder = builder
+ .exec().get(columnReader).callNoParam("consume").endExec();
+ if (state.maxRepetitionLevel == 0) {
+ // in that case nextR is always 0
+ builder = builder // TODO: instead change the case lookup code
+ .set("nextR").literal(0).endSet();
+ } else {
+ builder = builder
+ .set("nextR").get(columnReader).callNoParam("getCurrentRepetitionLevel").endSet();
+ }
+ if (cases.size() == 1) {
+ // then no need to switch, directly generate the body of the case
+ final Case currentCase = cases.get(0);
+ return generateCase(defined, state, currentCase, builder, stateCount);
+ } else {
+ // more than one case: generate a switch
+ return builder
+ .switchOn()
+ .callOnThis("getCaseId").literal(state.id).nextParam().get("currentLevel").nextParam().get("d").nextParam().get("nextR").endCall()
+ .switchBlock().transform(
+ new Function<SwitchBuilder<S>, SwitchBuilder<S>>() {
+ public SwitchBuilder<S> apply(SwitchBuilder<S> builder) {
+ for (Case currentCase : cases) {
+ if (currentCase.isGoingUp() || defined || currentCase.isGoingDown()) {
+ builder =
+ generateCase(defined, state, currentCase, builder.caseBlock(currentCase.getID()), stateCount)
+ .endCase();
+ } else {
+ // if nothing to do, directly jump to the next state
+ String label = getStateLabel(stateCount, currentCase.getNextState());
+ builder = builder.gotoLabel(currentCase.getID(), label);
+ }
+ }
+ return builder;
+ }
+ })
+ .defaultCase()
+ // a default case to be safe: this should never happen
+ .exec().callOnThis("error").literal("unknown case").endCall().endExec()
+ .breakCase()
+ .endSwitch();
+ }
+ }
+
+ public <T> RecordReader<T> compile(final RecordReaderImplementation<T> recordReader) {
+ final int stateCount = recordReader.getStateCount();
+ // create a unique class name
+ String className = this.getClass().getName()+"$CompiledRecordReader"+(++id);
+ ClassBuilder classBuilder = new Builder(false)
+ .startClass(className, existing(BaseRecordReader.class))
+ .field(PUBLIC, existing(GroupConverter.class), "recordConsumer"); // converters root
+ for (int i = 0; i < stateCount; i++) {
+ State state = recordReader.getState(i);
+ // TODO: look into using the actual class. It fails when the class is private. maybe setAccessible?
+ // create a field for each column reader
+ classBuilder = classBuilder
+ .field(PUBLIC, existing(ColumnReader.class), "state_"+ i +"_column")
+ .field(PUBLIC, existing(PrimitiveConverter.class), primitiveConverterName(i)); // primitiveConverter
+
+ for (int j = 0; j < state.groupConverterPath.length; j++) {
+ classBuilder = classBuilder
+ .field(PUBLIC, existing(GroupConverter.class), groupConverterName(i, j)); // groupConverters
+ }
+
+ }
+
+ MethodBuilder readMethodBuilder = classBuilder
+ .startMethod(PUBLIC, VOID, "readOneRecord")
+ // declare variables
+ .var(INT, "currentLevel")
+ .var(INT, "d")
+ .var(INT, "nextR")
+ // debug statement
+ .transform(this.<MethodBuilder>debug("startMessage"))
+ // generating: startMessage();
+ .exec().get("recordConsumer").callNoParam("start").endExec()
+ // initially: currentLevel = 0;
+ .set("currentLevel").literal(0).endSet();
+ for (int i = 0; i < stateCount; i++) {
+ // generate the code for each state of the FSA
+ final State state = recordReader.getState(i);
+ String columnReader = "state_"+i+"_column";
+ readMethodBuilder = readMethodBuilder
+ .label("state_"+i)
+ .transform(this.<MethodBuilder>debug("state "+i));
+
+ if (state.maxDefinitionLevel == 0) {
+ // then it is always defined, we can skip the if
+ readMethodBuilder = generateSwitch(readMethodBuilder, true, state, stateCount);
+ } else {
+ readMethodBuilder = readMethodBuilder
+ // generating:
+ // int d = columnReader.getCurrentDefinitionLevel();
+ .set("d").get(columnReader).callNoParam("getCurrentDefinitionLevel").endSet()
+ // if it is defined (d == maxDefinitionLevel) then
+ .ifExp().get("d").isEqualTo().literal(state.maxDefinitionLevel).thenBlock()
+ .transform(new Function<ThenBuilder<MethodBuilder>, ThenBuilder<MethodBuilder>>() {
+ public ThenBuilder<MethodBuilder> apply(ThenBuilder<MethodBuilder> builder) {
+ // generate The switch in the defined case (primitive values will be created)
+ return generateSwitch(builder, true, state, stateCount);
+ }
+ })
+ .elseBlock() // otherwise:
+ .transform(new Function<ElseBuilder<MethodBuilder>, ElseBuilder<MethodBuilder>>() {
+ public ElseBuilder<MethodBuilder> apply(ElseBuilder<MethodBuilder> builder) {
+ // generate The switch in the undefined case (primitive values will not be created)
+ return generateSwitch(builder, false, state, stateCount);
+ }
+ })
+ .endIf();
+ }
+ }
+
+ FutureType testClass = readMethodBuilder
+ .label(END_RECORD)
+ // endMessage();
+ .exec().get("recordConsumer").callNoParam("end").endExec()
+ .endMethod()
+ .endClass();
+
+ cl.define(testClass);
+ try {
+ Class<?> generated = (Class<?>)cl.loadClass(className);
+ BaseRecordReader<T> compiledRecordReader = (BaseRecordReader<T>)generated.getConstructor().newInstance();
+ try {
+ generated.getField("recordConsumer").set(compiledRecordReader, recordReader.getRecordConsumer());
+ compiledRecordReader.caseLookup = new State[stateCount];
+ for (int i = 0; i < stateCount; i++) {
+ State state = recordReader.getState(i);
+ generated.getField("state_"+i+"_column").set(compiledRecordReader, state.column);
+ generated.getField(primitiveConverterName(i)).set(compiledRecordReader, state.primitiveConverter);
+ for (int j = 0; j < state.groupConverterPath.length; j++) {
+ generated.getField(groupConverterName(i, j)).set(compiledRecordReader, state.groupConverterPath[j]);
+ }
+ compiledRecordReader.caseLookup[i] = state;
+ }
+ } catch (NoSuchFieldException e) {
+ throw new CompilationException("bug: can't find field", e);
+ }
+ compiledRecordReader.recordMaterializer = recordReader.getMaterializer();
+ return compiledRecordReader;
+ } catch (ClassNotFoundException e) {
+ throw new CompilationException("generated class "+className+" could not be loaded", e);
+ } catch (InstantiationException e) {
+ throw new CompilationException("generated class "+className+" could not be instantiated", e);
+ } catch (IllegalAccessException e) {
+ throw new CompilationException("generated class "+className+" is not accessible", e);
+ } catch (IllegalArgumentException e) {
+ throw new CompilationException("generated class "+className+" could not be instantiated", e);
+ } catch (SecurityException e) {
+ throw new CompilationException("generated class "+className+" could not be instantiated", e);
+ } catch (InvocationTargetException e) {
+ throw new CompilationException("generated class "+className+" could not be instantiated", e);
+ } catch (NoSuchMethodException e) {
+ throw new CompilationException("generated class "+className+" could not be instantiated", e);
+ }
+ }
+
+ String groupConverterName(int state, int level) {
+ return "state_" + state + "_groupConverterPath_" + level;
+ }
+
+ String primitiveConverterName(int state) {
+ return "state_"+ state + "_primitive";
+ }
+
+ private <S extends StatementBuilder<S>> Function<S, S> debug(final String message) {
+ return new Function<S, S>() {
+ @Override
+ public S apply(S builder) {
+ if (DEBUG) builder = builder.exec().callOnThis("log").literal(message).endCall().endExec();
+ return builder;
+ }
+
+ };
+ }
+}
diff --git a/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java b/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java
index 4d66278..a4a993b 100644
--- a/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java
+++ b/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java
@@ -444,7 +444,7 @@
return recordMaterializer;
}
- protected Converter getRecordConsumer() {
+ protected GroupConverter getRecordConsumer() {
return recordConsumer;
}
diff --git a/parquet-column/src/main/java/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/parquet/schema/PrimitiveType.java
index 875db02..bf56df7 100644
--- a/parquet-column/src/main/java/parquet/schema/PrimitiveType.java
+++ b/parquet-column/src/main/java/parquet/schema/PrimitiveType.java
@@ -60,7 +60,8 @@
* @author Julien Le Dem
*/
public static enum PrimitiveTypeName {
- INT64("getLong", Long.TYPE) {
+
+ INT64("Long", Long.TYPE) {
@Override
public String toString(ColumnReader columnReader) {
return String.valueOf(columnReader.getLong());
@@ -83,7 +84,7 @@
return converter.convertINT64(this);
}
},
- INT32("getInteger", Integer.TYPE) {
+ INT32("Integer", Integer.TYPE) {
@Override
public String toString(ColumnReader columnReader) {
return String.valueOf(columnReader.getInteger());
@@ -106,7 +107,7 @@
return converter.convertINT32(this);
}
},
- BOOLEAN("getBoolean", Boolean.TYPE) {
+ BOOLEAN("Boolean", Boolean.TYPE) {
@Override
public String toString(ColumnReader columnReader) {
return String.valueOf(columnReader.getBoolean());
@@ -129,7 +130,7 @@
return converter.convertBOOLEAN(this);
}
},
- BINARY("getBinary", Binary.class) {
+ BINARY("Binary", Binary.class) {
@Override
public String toString(ColumnReader columnReader) {
return String.valueOf(columnReader.getBinary());
@@ -152,7 +153,7 @@
return converter.convertBINARY(this);
}
},
- FLOAT("getFloat", Float.TYPE) {
+ FLOAT("Float", Float.TYPE) {
@Override
public String toString(ColumnReader columnReader) {
return String.valueOf(columnReader.getFloat());
@@ -175,7 +176,7 @@
return converter.convertFLOAT(this);
}
},
- DOUBLE("getDouble", Double.TYPE) {
+ DOUBLE("Double", Double.TYPE) {
@Override
public String toString(ColumnReader columnReader) {
return String.valueOf(columnReader.getDouble());
@@ -244,14 +245,23 @@
}
};
- public final String getMethod;
+ private final String internalName;
public final Class<?> javaType;
- private PrimitiveTypeName(String getMethod, Class<?> javaType) {
- this.getMethod = getMethod;
+ private PrimitiveTypeName(String internalName, Class<?> javaType) {
+ this.internalName = internalName;
this.javaType = javaType;
}
+ public String getMethod() {
+ return "get" + internalName;
+ }
+
+ public String addMethod() {
+ return "add" + internalName;
+ }
+
+
/**
* reads the value from the columnReader with the appropriate accessor and returns a String representation
* @param columnReader
diff --git a/parquet-column/src/test/java/parquet/io/PerfTest.java b/parquet-column/src/test/java/parquet/io/PerfTest.java
index 7372941..7424950 100644
--- a/parquet-column/src/test/java/parquet/io/PerfTest.java
+++ b/parquet-column/src/test/java/parquet/io/PerfTest.java
@@ -35,6 +35,9 @@
/**
* make sure {@link Log#LEVEL} is set to {@link Level#OFF}
*
+ * run with -verbose:gc -Xmx2g -Xms2g
+ * There should be no gc logs in between <<<>>>
+ *
* @author Julien Le Dem
*
*/
@@ -47,17 +50,35 @@
}
private static void read(MemPageStore memPageStore) {
- read(memPageStore, schema, "read all");
- read(memPageStore, schema2, "read projected");
- read(memPageStore, schema3, "read projected no Strings");
+// readDynamic(memPageStore, schema, "read all");
+ readDynamic(memPageStore, schema, "read all");
+// readCompiled(memPageStore, schema, "read all");
+ readCompiled(memPageStore, schema, "read all");
+ readDynamic(memPageStore, schema2, "read projected");
+// readCompiled(memPageStore, schema2, "read projected");
+ readCompiled(memPageStore, schema2, "read projected");
+ readDynamic(memPageStore, schema3, "read projected no Strings");
+// readCompiled(memPageStore, schema3, "read projected no Strings");
+ readCompiled(memPageStore, schema3, "read projected no Strings");
}
+ static void readDynamic(MemPageStore memPageStore, MessageType schema, String message) {
+ read(memPageStore, schema, message, false);
+ }
+ static void readCompiled(MemPageStore memPageStore, MessageType schema, String message) {
+ read(memPageStore, schema, message, true);
+ }
private static void read(MemPageStore memPageStore, MessageType myschema,
- String message) {
+ String message, boolean compiled) {
MessageColumnIO columnIO = newColumnFactory(myschema);
- System.out.println(message);
+
+ System.out.println(message + (compiled ? " compiled" : ""));
RecordMaterializer<Object> recordConsumer = new DummyRecordConverter(myschema);
+
RecordReader<Object> recordReader = columnIO.getRecordReader(memPageStore, recordConsumer);
+ if (compiled) {
+ recordReader = new RecordReaderCompiler().compile((RecordReaderImplementation<Object>)recordReader);
+ }
read(recordReader, 2, myschema);
read(recordReader, 10000, myschema);
diff --git a/parquet-column/src/test/java/parquet/io/TestRecordReaderCompiler.java b/parquet-column/src/test/java/parquet/io/TestRecordReaderCompiler.java
new file mode 100644
index 0000000..988004b
--- /dev/null
+++ b/parquet-column/src/test/java/parquet/io/TestRecordReaderCompiler.java
@@ -0,0 +1,79 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.io;
+
+import static parquet.example.Paper.r1;
+import static parquet.example.Paper.schema;
+import static parquet.io.TestColumnIO.expectedEventsForR1;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+
+
+import org.junit.Test;
+
+import parquet.column.ColumnWriteStore;
+import parquet.column.impl.ColumnWriteStoreImpl;
+import parquet.column.page.mem.MemPageStore;
+import parquet.example.data.GroupWriter;
+import parquet.io.ColumnIOFactory;
+import parquet.io.MessageColumnIO;
+import parquet.io.RecordReader;
+import parquet.io.RecordReaderCompiler;
+import parquet.io.RecordReaderImplementation;
+
+public class TestRecordReaderCompiler {
+
+ @Test
+ public void testRecordReaderCompiler() {
+
+ Logger.getLogger("brennus").setLevel(Level.FINEST);
+ Logger.getLogger("brennus").addHandler(new Handler() {
+ public void publish(LogRecord record) {
+ System.err.println(record.getMessage());
+ }
+ public void flush() {
+ System.err.flush();
+ }
+ public void close() throws SecurityException {
+ System.err.flush();
+ }
+ });
+
+ MemPageStore memPageStore = new MemPageStore();
+ ColumnWriteStore writeStore = new ColumnWriteStoreImpl(memPageStore, 1024*1024*1, 1024*1024*1, false);
+ MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
+ new GroupWriter(columnIO.getRecordWriter(writeStore), schema).write(r1);
+ writeStore.flush();
+ System.err.flush();
+ Logger.getLogger("brennus").info("compile");
+ System.out.println("compile");
+ RecordReader<Void> recordReader = columnIO.getRecordReader(
+ memPageStore,
+ new ExpectationValidatingConverter(
+ new ArrayDeque<String>(Arrays.asList(expectedEventsForR1)), schema));
+ recordReader = new RecordReaderCompiler().compile((RecordReaderImplementation<Void>)recordReader);
+
+ Logger.getLogger("brennus").info("read");
+ System.out.println("read");
+ recordReader.read();
+
+ }
+}