blob: 0b8a4649d092f721e252f7df50901f0674394bfd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.pig;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.pig.LoadFunc;
import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.StoreMetadata;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.parser.ParserException;
import org.apache.parquet.hadoop.util.ContextUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* Uses directly loader and storer to bypass the scheduling overhead
*
* @author Julien Le Dem
*
*/
public class PerfTest2 {
private static final Logger LOG = LoggerFactory.getLogger(PerfTest2.class);
private static final boolean DEBUG = LOG.isDebugEnabled();
static final int COLUMN_COUNT = 50;
private static final long ROW_COUNT = 100000;
private static Configuration conf = new Configuration();
private static int jobid = 0;
public static void main(String[] args) throws Exception {
StringBuilder results = new StringBuilder();
String out = "target/PerfTest2";
File outDir = new File(out);
if (outDir.exists()) {
clean(outDir);
}
write(out);
for (int i = 0; i < 2; i++) {
load(out, 1, results);
load(out, 2, results);
load(out, 3, results);
load(out, 4, results);
load(out, 5, results);
load(out, 10, results);
load(out, 20, results);
load(out, 50, results);
results.append("\n");
}
System.out.println(results);
}
public static void write(String out) throws IOException, ParserException,
InterruptedException, ExecException {
{
StringBuilder schemaString = new StringBuilder("a0: chararray");
for (int i = 1; i < COLUMN_COUNT; i++) {
schemaString.append(", a" + i + ": chararray");
}
String location = out;
String schema = schemaString.toString();
StoreFuncInterface storer = new ParquetStorer();
Job job = new Job(conf);
storer.setStoreFuncUDFContextSignature("sig");
String absPath = storer.relToAbsPathForStoreLocation(location, new Path(new File(".").getAbsoluteFile().toURI()));
storer.setStoreLocation(absPath, job);
storer.checkSchema(new ResourceSchema(Utils.getSchemaFromString(schema)));
@SuppressWarnings("unchecked") // that's how the base class is defined
OutputFormat<Void, Tuple> outputFormat = storer.getOutputFormat();
// it's ContextUtil.getConfiguration(job) and not just conf !
JobContext jobContext = ContextUtil.newJobContext(ContextUtil.getConfiguration(job), new JobID("jt", jobid ++));
outputFormat.checkOutputSpecs(jobContext);
if (schema != null) {
ResourceSchema resourceSchema = new ResourceSchema(Utils.getSchemaFromString(schema));
storer.checkSchema(resourceSchema);
if (storer instanceof StoreMetadata) {
((StoreMetadata)storer).storeSchema(resourceSchema, absPath, job);
}
}
TaskAttemptContext taskAttemptContext = ContextUtil.newTaskAttemptContext(ContextUtil.getConfiguration(job), new TaskAttemptID("jt", jobid, true, 1, 0));
RecordWriter<Void, Tuple> recordWriter = outputFormat.getRecordWriter(taskAttemptContext);
storer.prepareToWrite(recordWriter);
for (int i = 0; i < ROW_COUNT; i++) {
Tuple tuple = TupleFactory.getInstance().newTuple(COLUMN_COUNT);
for (int j = 0; j < COLUMN_COUNT; j++) {
tuple.set(j, "a" + i + "_" + j);
}
storer.putNext(tuple);
}
recordWriter.close(taskAttemptContext);
OutputCommitter outputCommitter = outputFormat.getOutputCommitter(taskAttemptContext);
outputCommitter.commitTask(taskAttemptContext);
outputCommitter.commitJob(jobContext);
}
}
static void clean(File outDir) {
if (outDir.isDirectory()) {
File[] listFiles = outDir.listFiles();
for (File file : listFiles) {
clean(file);
}
}
outDir.delete();
}
static void load(String out, int colsToLoad, StringBuilder results) throws Exception {
StringBuilder schemaString = new StringBuilder("a0: chararray");
for (int i = 1; i < colsToLoad; i++) {
schemaString.append(", a" + i + ": chararray");
}
long t0 = System.currentTimeMillis();
Job job = new Job(conf);
int loadjobId = jobid ++;
LoadFunc loadFunc = new ParquetLoader(schemaString.toString());
loadFunc.setUDFContextSignature("sigLoader"+loadjobId);
String absPath = loadFunc.relativeToAbsolutePath(out, new Path(new File(".").getAbsoluteFile().toURI()));
loadFunc.setLocation(absPath, job);
@SuppressWarnings("unchecked") // that's how the base class is defined
InputFormat<Void, Tuple> inputFormat = loadFunc.getInputFormat();
JobContext jobContext = ContextUtil.newJobContext(ContextUtil.getConfiguration(job), new JobID("jt", loadjobId));
List<InputSplit> splits = inputFormat.getSplits(jobContext);
int i = 0;
int taskid = 0;
for (InputSplit split : splits) {
TaskAttemptContext taskAttemptContext = ContextUtil.newTaskAttemptContext(ContextUtil.getConfiguration(job), new TaskAttemptID("jt", loadjobId, true, taskid++, 0));
RecordReader<Void, Tuple> recordReader = inputFormat.createRecordReader(split, taskAttemptContext);
loadFunc.prepareToRead(recordReader, null);
recordReader.initialize(split, taskAttemptContext);
Tuple t;
while ((t = loadFunc.getNext()) != null) {
if (DEBUG) System.out.println(t);
++i;
}
}
assertEquals(ROW_COUNT, i);
long t1 = System.currentTimeMillis();
results.append((t1-t0)+" ms to read "+colsToLoad+" columns\n");
}
}