blob: 2148e0655e6cb290ff865314f81483c2701f4981 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.pig;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import org.apache.parquet.column.ParquetProperties;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.NonSpillableDataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.parser.ParserException;
import org.apache.parquet.column.impl.ColumnWriteStoreV1;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.mem.MemPageStore;
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.RecordReader;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.schema.MessageType;
/**
* make sure {@link Log#LEVEL} is set to {@link Level#OFF}
*
* @author Julien Le Dem
*
*/
public class TupleConsumerPerfTest {
private static final int TOP_LEVEL_COLS = 1;
public static void main(String[] args) throws Exception {
String pigSchema = pigSchema(false, false);
String pigSchemaProjected = pigSchema(true, false);
String pigSchemaNoString = pigSchema(true, true);
MessageType schema = new PigSchemaConverter().convert(Utils.getSchemaFromString(pigSchema));
MemPageStore memPageStore = new MemPageStore(0);
ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(
memPageStore, ParquetProperties.builder()
.withPageSize(50*1024*1024)
.withDictionaryEncoding(false)
.build());
write(memPageStore, columns, schema, pigSchema);
columns.flush();
read(memPageStore, pigSchema, pigSchemaProjected, pigSchemaNoString);
System.out.println(columns.getBufferedSize()+" bytes used total");
System.out.println("max col size: "+columns.maxColMemSize()+" bytes");
}
private static String pigSchema(boolean projected, boolean noStrings) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < TOP_LEVEL_COLS; i++) {
if (i!=0) {
sb.append(", ");
}
sb.append("i"+i+":(");
if (!noStrings) {
for (int j = 0; j < (projected ? 2 : 4); j++) {
if (j!=0) {
sb.append(", ");
}
sb.append("j"+j+":chararray");
}
sb.append(", ");
}
for (int k = 0; k < (projected ? 2 : 4); k++) {
if (k!=0) {
sb.append(", ");
}
sb.append("k"+k+":long");
}
for (int l = 0; l < (projected ? 1 : 2); l++) {
sb.append(", ");
sb.append("l"+l+":{t:(v:int)}");
}
sb.append(")");
}
return sb.toString();
}
private static Tuple tuple() throws ExecException {
TupleFactory tf = TupleFactory.getInstance();
Tuple t = tf.newTuple(TOP_LEVEL_COLS);
for (int i = 0; i < TOP_LEVEL_COLS; i++) {
Tuple ti = tf.newTuple(10);
for (int j = 0; j < 4; j++) {
ti.set(j, "foo"+i+","+j);
}
for (int k = 0; k < 4; k++) {
ti.set(4+k, (long)k);
}
for (int l = 0; l < 2; l++) {
DataBag bag = new NonSpillableDataBag();
for (int m = 0; m < 10; m++) {
bag.add(tf.newTuple((Object)new Integer(m)));
}
ti.set(8+l, bag);
}
t.set(i, ti);
}
return t;
}
private static void read(PageReadStore columns, String pigSchemaString, String pigSchemaProjected, String pigSchemaProjectedNoStrings) throws ParserException {
read(columns, pigSchemaString, "read all");
read(columns, pigSchemaProjected, "read projected");
read(columns, pigSchemaProjectedNoStrings, "read projected no Strings");
}
private static void read(PageReadStore columns, String pigSchemaString, String message) throws ParserException {
System.out.println(message);
MessageColumnIO columnIO = newColumnFactory(pigSchemaString);
TupleReadSupport tupleReadSupport = new TupleReadSupport();
Map<String, String> pigMetaData = pigMetaData(pigSchemaString);
MessageType schema = new PigSchemaConverter().convert(Utils.getSchemaFromString(pigSchemaString));
ReadContext init = tupleReadSupport.init(null, pigMetaData, schema);
RecordMaterializer<Tuple> recordConsumer = tupleReadSupport.prepareForRead(null, pigMetaData, schema, init);
RecordReader<Tuple> recordReader = columnIO.getRecordReader(columns, recordConsumer);
// TODO: put this back
// if (DEBUG) {
// recordConsumer = new RecordConsumerLoggingWrapper(recordConsumer);
// }
read(recordReader, 10000, pigSchemaString);
read(recordReader, 10000, pigSchemaString);
read(recordReader, 10000, pigSchemaString);
read(recordReader, 10000, pigSchemaString);
read(recordReader, 10000, pigSchemaString);
read(recordReader, 100000, pigSchemaString);
read(recordReader, 1000000, pigSchemaString);
System.out.println();
}
private static Map<String, String> pigMetaData(String pigSchemaString) {
Map<String, String> map = new HashMap<String, String>();
new PigMetaData(pigSchemaString).addToMetaData(map);
return map;
}
private static void write(MemPageStore memPageStore, ColumnWriteStoreV1 columns, MessageType schema, String pigSchemaString) throws ExecException, ParserException {
MessageColumnIO columnIO = newColumnFactory(pigSchemaString);
TupleWriteSupport tupleWriter = TupleWriteSupport.fromPigSchema(pigSchemaString);
tupleWriter.init(null);
tupleWriter.prepareForWrite(columnIO.getRecordWriter(columns));
write(memPageStore, tupleWriter, 10000);
write(memPageStore, tupleWriter, 10000);
write(memPageStore, tupleWriter, 10000);
write(memPageStore, tupleWriter, 10000);
write(memPageStore, tupleWriter, 10000);
write(memPageStore, tupleWriter, 100000);
write(memPageStore, tupleWriter, 1000000);
System.out.println();
}
private static MessageColumnIO newColumnFactory(String pigSchemaString) throws ParserException {
MessageType schema = new PigSchemaConverter().convert(Utils.getSchemaFromString(pigSchemaString));
return new ColumnIOFactory().getColumnIO(schema);
}
private static void read(RecordReader<Tuple> recordReader, int count, String pigSchemaString) throws ParserException {
long t0 = System.currentTimeMillis();
Tuple tuple = null;
for (int i = 0; i < count; i++) {
tuple = recordReader.read();
}
if (tuple == null) {
throw new RuntimeException();
}
long t1 = System.currentTimeMillis();
long t = t1-t0;
float err = (float)100 * 2 / t; // (+/- 1 ms)
System.out.printf("read %,9d recs in %,5d ms at %,9d rec/s err: %3.2f%%\n", count , t, t == 0 ? 0 : count * 1000 / t, err);
}
private static void write(MemPageStore memPageStore, TupleWriteSupport tupleWriter, int count) throws ExecException {
Tuple tu = tuple();
long t0 = System.currentTimeMillis();
for (int i = 0; i < count; i++) {
tupleWriter.write(tu);
}
long t1 = System.currentTimeMillis();
long t = t1-t0;
memPageStore.addRowCount(count);
System.out.printf("written %,9d recs in %,5d ms at %,9d rec/s\n", count, t, t == 0 ? 0 : count * 1000 / t );
}
}