Merge branch 'master' into fsa_codegen2 Conflicts: parquet-column/pom.xml parquet-column/src/main/java/parquet/example/DummyRecordConverter.java parquet-column/src/main/java/parquet/schema/PrimitiveType.java parquet-column/src/test/java/parquet/io/PerfTest.java
diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml index f3cd109..42aee96 100644 --- a/parquet-column/pom.xml +++ b/parquet-column/pom.xml
@@ -36,6 +36,18 @@ <scope>compile</scope> </dependency> <dependency> + <groupId>net.ledem</groupId> + <artifactId>brennus-builder</artifactId> + <version>0.1.0</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>net.ledem</groupId> + <artifactId>brennus-asm</artifactId> + <version>0.1.0</version> + <scope>compile</scope> + </dependency> + <dependency> <groupId>com.twitter</groupId> <artifactId>parquet-encoding</artifactId> <version>${project.version}</version>
diff --git a/parquet-column/src/main/java/parquet/example/DummyRecordConverter.java b/parquet-column/src/main/java/parquet/example/DummyRecordConverter.java index 0bf4ea0..82d5ed5 100644 --- a/parquet-column/src/main/java/parquet/example/DummyRecordConverter.java +++ b/parquet-column/src/main/java/parquet/example/DummyRecordConverter.java
@@ -44,7 +44,6 @@ @Override public Converter convertPrimitiveType(List<GroupType> path, PrimitiveType primitiveType) { return new PrimitiveConverter() { - @Override public void addBinary(Binary value) { a = value;
diff --git a/parquet-column/src/main/java/parquet/io/BaseRecordReader.java b/parquet-column/src/main/java/parquet/io/BaseRecordReader.java index 58450d8..6fee2ae 100644 --- a/parquet-column/src/main/java/parquet/io/BaseRecordReader.java +++ b/parquet-column/src/main/java/parquet/io/BaseRecordReader.java
@@ -20,6 +20,8 @@ import parquet.column.ColumnReadStore; import parquet.io.RecordReaderImplementation.State; import parquet.io.api.Binary; +import parquet.io.api.Converter; +import parquet.io.api.GroupConverter; import parquet.io.api.RecordConsumer; import parquet.io.api.RecordMaterializer; @@ -27,7 +29,6 @@ public abstract class BaseRecordReader<T> extends RecordReader<T> { private static final Log LOG = Log.getLog(BaseRecordReader.class); - public RecordConsumer recordConsumer; public RecordMaterializer<T> recordMaterializer; public ColumnReadStore columnStore; @Override @@ -40,10 +41,6 @@ State[] caseLookup; - private String endField; - - private int endIndex; - protected void currentLevel(int currentLevel) { if (DEBUG) LOG.debug("currentLevel: "+currentLevel); } @@ -56,85 +53,6 @@ return caseLookup[state].getCase(currentLevel, d, nextR).getID(); } - final protected void startMessage() { - // reset state - endField = null; - if (DEBUG) LOG.debug("startMessage()"); - recordConsumer.startMessage(); - } - - final protected void startGroup(String field, int index) { - startField(field, index); - if (DEBUG) LOG.debug("startGroup()"); - recordConsumer.startGroup(); - } - - private void startField(String field, int index) { - if (DEBUG) LOG.debug("startField("+field+","+index+")"); - if (endField != null && index == endIndex) { - // skip the close/open tag - endField = null; - } else { - if (endField != null) { - // close the previous field - recordConsumer.endField(endField, endIndex); - endField = null; - } - recordConsumer.startField(field, index); - } - } - - final protected void addPrimitiveINT64(String field, int index, long value) { - startField(field, index); - if (DEBUG) LOG.debug("addLong("+value+")"); - recordConsumer.addLong(value); - endField(field, index); - } - - private void endField(String field, int index) { - if (DEBUG) LOG.debug("endField("+field+","+index+")"); - if (endField != null) { - recordConsumer.endField(endField, endIndex); - } - endField = field; - endIndex = index; - } - - final protected void addPrimitiveBINARY(String field, int index, Binary value) { - startField(field, index); - if (DEBUG) LOG.debug("addBinary("+value+")"); - recordConsumer.addBinary(value); - endField(field, index); - } - - final protected void addPrimitiveINT32(String field, int index, int value) { - startField(field, index); - if (DEBUG) LOG.debug("addInteger("+value+")"); - recordConsumer.addInteger(value); - endField(field, index); - } - - final protected void endGroup(String field, int index) { - if (endField != null) { - // close the previous field - recordConsumer.endField(endField, endIndex); - endField = null; - } - if (DEBUG) LOG.debug("endGroup()"); - recordConsumer.endGroup(); - endField(field, index); - } - - final protected void endMessage() { - if (endField != null) { - // close the previous field - recordConsumer.endField(endField, endIndex); - endField = null; - } - if (DEBUG) LOG.debug("endMessage()"); - recordConsumer.endMessage(); - } - protected void error(String message) { throw new ParquetDecodingException(message); }
diff --git a/parquet-column/src/main/java/parquet/io/RecordReaderCompiler.java b/parquet-column/src/main/java/parquet/io/RecordReaderCompiler.java new file mode 100644 index 0000000..72e3dde --- /dev/null +++ b/parquet-column/src/main/java/parquet/io/RecordReaderCompiler.java
@@ -0,0 +1,296 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.io; + +import static brennus.model.ExistingType.INT; +import static brennus.model.ExistingType.VOID; +import static brennus.model.ExistingType.existing; +import static brennus.model.Protection.PUBLIC; +import static parquet.Log.DEBUG; + +import java.lang.reflect.InvocationTargetException; +import java.util.List; + +import parquet.column.ColumnReader; +import parquet.io.RecordReaderImplementation.Case; +import parquet.io.RecordReaderImplementation.State; +import parquet.io.api.GroupConverter; +import parquet.io.api.PrimitiveConverter; + +import brennus.Builder; +import brennus.ClassBuilder; +import brennus.ElseBuilder; +import brennus.Function; +import brennus.MethodBuilder; +import brennus.StatementBuilder; +import brennus.SwitchBuilder; +import brennus.ThenBuilder; +import brennus.asm.ASMTypeGenerator; +import brennus.model.FutureType; +import brennus.printer.TypePrinter; + +public class RecordReaderCompiler { + + private static final String END_RECORD = "endRecord"; + + public static class DynamicClassLoader extends ClassLoader { + ASMTypeGenerator asmTypeGenerator = new ASMTypeGenerator(); + + public Class<?> define(FutureType type) { + if (DEBUG) new TypePrinter().print(type); + byte[] classBytes = asmTypeGenerator.generate(type); + return super.defineClass(type.getName(), classBytes, 0, classBytes.length); + } + } + + private DynamicClassLoader cl = new DynamicClassLoader(); + private int id = 0; + + private <S extends StatementBuilder<S>> S generateCase(boolean addPrimitive, State state, Case currentCase, S builder, int stateCount) { + if (currentCase.isGoingUp()) { + // Generating the following loop: + // for (; currentLevel <= depth; ++currentLevel) { + // currentState.groupConverterPath[currentLevel].start(); + // } + for (int i = currentCase.getStartLevel(); i <= currentCase.getDepth(); i++) { + // Generating the following call: + // currentState.groupConverterPath[currentLevel].start(); + builder = builder.exec().get(groupConverterName(state.id, i)).callNoParam("start").endExec(); + } + } + if (addPrimitive) { + // Generating the following call: + // currentState.primitive.addValueToPrimitiveConverter(currentState.primitiveConverter, columnReader); + builder = builder.exec().get(primitiveConverterName(state.id)) + .call(state.primitive.addMethod()) + .get("value_"+state.id) + .endCall().endExec(); + } + if (currentCase.isGoingDown()) { + // Generating the following loop + // for (; currentLevel > next; currentLevel--) { + // currentState.groupConverterPath[currentLevel - 1].end(); + // } + for (int i = currentCase.getDepth() + 1; i > currentCase.getNextLevel(); i--) { + builder = builder.exec().get(groupConverterName(state.id, i - 1)).callNoParam("end").endExec(); + } + } + // set currentLevel to its new value + if (currentCase.isGoingDown()) { + builder = builder + .set("currentLevel").literal(currentCase.getNextLevel()).endSet(); + } else if (currentCase.isGoingUp()) { + builder = builder + .set("currentLevel").literal(currentCase.getDepth() + 1).endSet(); + } else { + // currentLevel stays the same + } + int nextReader = currentCase.getNextState(); + String label = getStateLabel(stateCount, nextReader); + builder = builder.gotoLabel(label); + return builder; + } + + private String getStateLabel(int stateCount, int stateId) { + return stateId == stateCount ? END_RECORD : "state_" + stateId; + } + + private <S extends StatementBuilder<S>> S generateSwitch(S builder, final boolean defined, final State state, final int stateCount) { + final List<Case> cases = defined ? state.getDefinedCases() : state.getUndefinedCases(); + String columnReader = "state_"+state.id+"_column"; + if (defined) { + // if defined we need to save the current value + builder = builder + .var(existing(state.primitive.javaType), "value_"+state.id) + .set("value_"+state.id).get(columnReader).callNoParam(state.primitive.getMethod()).endSet(); + } + builder = builder + .exec().get(columnReader).callNoParam("consume").endExec(); + if (state.maxRepetitionLevel == 0) { + // in that case nextR is always 0 + builder = builder // TODO: instead change the case lookup code + .set("nextR").literal(0).endSet(); + } else { + builder = builder + .set("nextR").get(columnReader).callNoParam("getCurrentRepetitionLevel").endSet(); + } + if (cases.size() == 1) { + // then no need to switch, directly generate the body of the case + final Case currentCase = cases.get(0); + return generateCase(defined, state, currentCase, builder, stateCount); + } else { + // more than one case: generate a switch + return builder + .switchOn() + .callOnThis("getCaseId").literal(state.id).nextParam().get("currentLevel").nextParam().get("d").nextParam().get("nextR").endCall() + .switchBlock().transform( + new Function<SwitchBuilder<S>, SwitchBuilder<S>>() { + public SwitchBuilder<S> apply(SwitchBuilder<S> builder) { + for (Case currentCase : cases) { + if (currentCase.isGoingUp() || defined || currentCase.isGoingDown()) { + builder = + generateCase(defined, state, currentCase, builder.caseBlock(currentCase.getID()), stateCount) + .endCase(); + } else { + // if nothing to do, directly jump to the next state + String label = getStateLabel(stateCount, currentCase.getNextState()); + builder = builder.gotoLabel(currentCase.getID(), label); + } + } + return builder; + } + }) + .defaultCase() + // a default case to be safe: this should never happen + .exec().callOnThis("error").literal("unknown case").endCall().endExec() + .breakCase() + .endSwitch(); + } + } + + public <T> RecordReader<T> compile(final RecordReaderImplementation<T> recordReader) { + final int stateCount = recordReader.getStateCount(); + // create a unique class name + String className = this.getClass().getName()+"$CompiledRecordReader"+(++id); + ClassBuilder classBuilder = new Builder(false) + .startClass(className, existing(BaseRecordReader.class)) + .field(PUBLIC, existing(GroupConverter.class), "recordConsumer"); // converters root + for (int i = 0; i < stateCount; i++) { + State state = recordReader.getState(i); + // TODO: look into using the actual class. It fails when the class is private. maybe setAccessible? + // create a field for each column reader + classBuilder = classBuilder + .field(PUBLIC, existing(ColumnReader.class), "state_"+ i +"_column") + .field(PUBLIC, existing(PrimitiveConverter.class), primitiveConverterName(i)); // primitiveConverter + + for (int j = 0; j < state.groupConverterPath.length; j++) { + classBuilder = classBuilder + .field(PUBLIC, existing(GroupConverter.class), groupConverterName(i, j)); // groupConverters + } + + } + + MethodBuilder readMethodBuilder = classBuilder + .startMethod(PUBLIC, VOID, "readOneRecord") + // declare variables + .var(INT, "currentLevel") + .var(INT, "d") + .var(INT, "nextR") + // debug statement + .transform(this.<MethodBuilder>debug("startMessage")) + // generating: startMessage(); + .exec().get("recordConsumer").callNoParam("start").endExec() + // initially: currentLevel = 0; + .set("currentLevel").literal(0).endSet(); + for (int i = 0; i < stateCount; i++) { + // generate the code for each state of the FSA + final State state = recordReader.getState(i); + String columnReader = "state_"+i+"_column"; + readMethodBuilder = readMethodBuilder + .label("state_"+i) + .transform(this.<MethodBuilder>debug("state "+i)); + + if (state.maxDefinitionLevel == 0) { + // then it is always defined, we can skip the if + readMethodBuilder = generateSwitch(readMethodBuilder, true, state, stateCount); + } else { + readMethodBuilder = readMethodBuilder + // generating: + // int d = columnReader.getCurrentDefinitionLevel(); + .set("d").get(columnReader).callNoParam("getCurrentDefinitionLevel").endSet() + // if it is defined (d == maxDefinitionLevel) then + .ifExp().get("d").isEqualTo().literal(state.maxDefinitionLevel).thenBlock() + .transform(new Function<ThenBuilder<MethodBuilder>, ThenBuilder<MethodBuilder>>() { + public ThenBuilder<MethodBuilder> apply(ThenBuilder<MethodBuilder> builder) { + // generate The switch in the defined case (primitive values will be created) + return generateSwitch(builder, true, state, stateCount); + } + }) + .elseBlock() // otherwise: + .transform(new Function<ElseBuilder<MethodBuilder>, ElseBuilder<MethodBuilder>>() { + public ElseBuilder<MethodBuilder> apply(ElseBuilder<MethodBuilder> builder) { + // generate The switch in the undefined case (primitive values will not be created) + return generateSwitch(builder, false, state, stateCount); + } + }) + .endIf(); + } + } + + FutureType testClass = readMethodBuilder + .label(END_RECORD) + // endMessage(); + .exec().get("recordConsumer").callNoParam("end").endExec() + .endMethod() + .endClass(); + + cl.define(testClass); + try { + Class<?> generated = (Class<?>)cl.loadClass(className); + BaseRecordReader<T> compiledRecordReader = (BaseRecordReader<T>)generated.getConstructor().newInstance(); + try { + generated.getField("recordConsumer").set(compiledRecordReader, recordReader.getRecordConsumer()); + compiledRecordReader.caseLookup = new State[stateCount]; + for (int i = 0; i < stateCount; i++) { + State state = recordReader.getState(i); + generated.getField("state_"+i+"_column").set(compiledRecordReader, state.column); + generated.getField(primitiveConverterName(i)).set(compiledRecordReader, state.primitiveConverter); + for (int j = 0; j < state.groupConverterPath.length; j++) { + generated.getField(groupConverterName(i, j)).set(compiledRecordReader, state.groupConverterPath[j]); + } + compiledRecordReader.caseLookup[i] = state; + } + } catch (NoSuchFieldException e) { + throw new CompilationException("bug: can't find field", e); + } + compiledRecordReader.recordMaterializer = recordReader.getMaterializer(); + return compiledRecordReader; + } catch (ClassNotFoundException e) { + throw new CompilationException("generated class "+className+" could not be loaded", e); + } catch (InstantiationException e) { + throw new CompilationException("generated class "+className+" could not be instantiated", e); + } catch (IllegalAccessException e) { + throw new CompilationException("generated class "+className+" is not accessible", e); + } catch (IllegalArgumentException e) { + throw new CompilationException("generated class "+className+" could not be instantiated", e); + } catch (SecurityException e) { + throw new CompilationException("generated class "+className+" could not be instantiated", e); + } catch (InvocationTargetException e) { + throw new CompilationException("generated class "+className+" could not be instantiated", e); + } catch (NoSuchMethodException e) { + throw new CompilationException("generated class "+className+" could not be instantiated", e); + } + } + + String groupConverterName(int state, int level) { + return "state_" + state + "_groupConverterPath_" + level; + } + + String primitiveConverterName(int state) { + return "state_"+ state + "_primitive"; + } + + private <S extends StatementBuilder<S>> Function<S, S> debug(final String message) { + return new Function<S, S>() { + @Override + public S apply(S builder) { + if (DEBUG) builder = builder.exec().callOnThis("log").literal(message).endCall().endExec(); + return builder; + } + + }; + } +}
diff --git a/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java b/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java index 4d66278..a4a993b 100644 --- a/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java +++ b/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java
@@ -444,7 +444,7 @@ return recordMaterializer; } - protected Converter getRecordConsumer() { + protected GroupConverter getRecordConsumer() { return recordConsumer; }
diff --git a/parquet-column/src/main/java/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/parquet/schema/PrimitiveType.java index 875db02..bf56df7 100644 --- a/parquet-column/src/main/java/parquet/schema/PrimitiveType.java +++ b/parquet-column/src/main/java/parquet/schema/PrimitiveType.java
@@ -60,7 +60,8 @@ * @author Julien Le Dem */ public static enum PrimitiveTypeName { - INT64("getLong", Long.TYPE) { + + INT64("Long", Long.TYPE) { @Override public String toString(ColumnReader columnReader) { return String.valueOf(columnReader.getLong()); @@ -83,7 +84,7 @@ return converter.convertINT64(this); } }, - INT32("getInteger", Integer.TYPE) { + INT32("Integer", Integer.TYPE) { @Override public String toString(ColumnReader columnReader) { return String.valueOf(columnReader.getInteger()); @@ -106,7 +107,7 @@ return converter.convertINT32(this); } }, - BOOLEAN("getBoolean", Boolean.TYPE) { + BOOLEAN("Boolean", Boolean.TYPE) { @Override public String toString(ColumnReader columnReader) { return String.valueOf(columnReader.getBoolean()); @@ -129,7 +130,7 @@ return converter.convertBOOLEAN(this); } }, - BINARY("getBinary", Binary.class) { + BINARY("Binary", Binary.class) { @Override public String toString(ColumnReader columnReader) { return String.valueOf(columnReader.getBinary()); @@ -152,7 +153,7 @@ return converter.convertBINARY(this); } }, - FLOAT("getFloat", Float.TYPE) { + FLOAT("Float", Float.TYPE) { @Override public String toString(ColumnReader columnReader) { return String.valueOf(columnReader.getFloat()); @@ -175,7 +176,7 @@ return converter.convertFLOAT(this); } }, - DOUBLE("getDouble", Double.TYPE) { + DOUBLE("Double", Double.TYPE) { @Override public String toString(ColumnReader columnReader) { return String.valueOf(columnReader.getDouble()); @@ -244,14 +245,23 @@ } }; - public final String getMethod; + private final String internalName; public final Class<?> javaType; - private PrimitiveTypeName(String getMethod, Class<?> javaType) { - this.getMethod = getMethod; + private PrimitiveTypeName(String internalName, Class<?> javaType) { + this.internalName = internalName; this.javaType = javaType; } + public String getMethod() { + return "get" + internalName; + } + + public String addMethod() { + return "add" + internalName; + } + + /** * reads the value from the columnReader with the appropriate accessor and returns a String representation * @param columnReader
diff --git a/parquet-column/src/test/java/parquet/io/PerfTest.java b/parquet-column/src/test/java/parquet/io/PerfTest.java index 7372941..7424950 100644 --- a/parquet-column/src/test/java/parquet/io/PerfTest.java +++ b/parquet-column/src/test/java/parquet/io/PerfTest.java
@@ -35,6 +35,9 @@ /** * make sure {@link Log#LEVEL} is set to {@link Level#OFF} * + * run with -verbose:gc -Xmx2g -Xms2g + * There should be no gc logs in between <<<>>> + * * @author Julien Le Dem * */ @@ -47,17 +50,35 @@ } private static void read(MemPageStore memPageStore) { - read(memPageStore, schema, "read all"); - read(memPageStore, schema2, "read projected"); - read(memPageStore, schema3, "read projected no Strings"); +// readDynamic(memPageStore, schema, "read all"); + readDynamic(memPageStore, schema, "read all"); +// readCompiled(memPageStore, schema, "read all"); + readCompiled(memPageStore, schema, "read all"); + readDynamic(memPageStore, schema2, "read projected"); +// readCompiled(memPageStore, schema2, "read projected"); + readCompiled(memPageStore, schema2, "read projected"); + readDynamic(memPageStore, schema3, "read projected no Strings"); +// readCompiled(memPageStore, schema3, "read projected no Strings"); + readCompiled(memPageStore, schema3, "read projected no Strings"); } + static void readDynamic(MemPageStore memPageStore, MessageType schema, String message) { + read(memPageStore, schema, message, false); + } + static void readCompiled(MemPageStore memPageStore, MessageType schema, String message) { + read(memPageStore, schema, message, true); + } private static void read(MemPageStore memPageStore, MessageType myschema, - String message) { + String message, boolean compiled) { MessageColumnIO columnIO = newColumnFactory(myschema); - System.out.println(message); + + System.out.println(message + (compiled ? " compiled" : "")); RecordMaterializer<Object> recordConsumer = new DummyRecordConverter(myschema); + RecordReader<Object> recordReader = columnIO.getRecordReader(memPageStore, recordConsumer); + if (compiled) { + recordReader = new RecordReaderCompiler().compile((RecordReaderImplementation<Object>)recordReader); + } read(recordReader, 2, myschema); read(recordReader, 10000, myschema);
diff --git a/parquet-column/src/test/java/parquet/io/TestRecordReaderCompiler.java b/parquet-column/src/test/java/parquet/io/TestRecordReaderCompiler.java new file mode 100644 index 0000000..988004b --- /dev/null +++ b/parquet-column/src/test/java/parquet/io/TestRecordReaderCompiler.java
@@ -0,0 +1,79 @@ +/** + * Copyright 2012 Twitter, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package parquet.io; + +import static parquet.example.Paper.r1; +import static parquet.example.Paper.schema; +import static parquet.io.TestColumnIO.expectedEventsForR1; + +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.logging.Handler; +import java.util.logging.Level; +import java.util.logging.LogRecord; +import java.util.logging.Logger; + + +import org.junit.Test; + +import parquet.column.ColumnWriteStore; +import parquet.column.impl.ColumnWriteStoreImpl; +import parquet.column.page.mem.MemPageStore; +import parquet.example.data.GroupWriter; +import parquet.io.ColumnIOFactory; +import parquet.io.MessageColumnIO; +import parquet.io.RecordReader; +import parquet.io.RecordReaderCompiler; +import parquet.io.RecordReaderImplementation; + +public class TestRecordReaderCompiler { + + @Test + public void testRecordReaderCompiler() { + + Logger.getLogger("brennus").setLevel(Level.FINEST); + Logger.getLogger("brennus").addHandler(new Handler() { + public void publish(LogRecord record) { + System.err.println(record.getMessage()); + } + public void flush() { + System.err.flush(); + } + public void close() throws SecurityException { + System.err.flush(); + } + }); + + MemPageStore memPageStore = new MemPageStore(); + ColumnWriteStore writeStore = new ColumnWriteStoreImpl(memPageStore, 1024*1024*1, 1024*1024*1, false); + MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema); + new GroupWriter(columnIO.getRecordWriter(writeStore), schema).write(r1); + writeStore.flush(); + System.err.flush(); + Logger.getLogger("brennus").info("compile"); + System.out.println("compile"); + RecordReader<Void> recordReader = columnIO.getRecordReader( + memPageStore, + new ExpectationValidatingConverter( + new ArrayDeque<String>(Arrays.asList(expectedEventsForR1)), schema)); + recordReader = new RecordReaderCompiler().compile((RecordReaderImplementation<Void>)recordReader); + + Logger.getLogger("brennus").info("read"); + System.out.println("read"); + recordReader.read(); + + } +}