blob: 8e8936306428e99e672de5a7f3f46f8ac99c272a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.avro;
import static java.lang.Thread.sleep;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.apache.parquet.column.ColumnReader;
import org.apache.parquet.filter.ColumnPredicates;
import org.apache.parquet.filter.ColumnRecordFilter;
import org.apache.parquet.filter.RecordFilter;
import org.apache.parquet.filter.UnboundRecordFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestSpecificInputOutputFormat {
private static final Logger LOG = LoggerFactory.getLogger(TestSpecificInputOutputFormat.class);
public static Car nextRecord(int i) {
String vin = "1VXBR12EXCP000000";
Car.Builder carBuilder = Car.newBuilder()
.setDoors(2)
.setMake("Tesla")
.setModel(String.format("Model X v%d", i % 2))
.setVin(new Vin(vin.getBytes()))
.setYear(2014 + i)
.setOptionalExtra(LeatherTrim.newBuilder().setColour("black").build())
.setRegistration("California");
Engine.Builder engineBuilder = Engine.newBuilder()
.setCapacity(85.0f)
.setHasTurboCharger(false);
if (i % 2 == 0) {
engineBuilder.setType(EngineType.ELECTRIC);
} else {
engineBuilder.setType(EngineType.PETROL);
}
carBuilder.setEngine(engineBuilder.build());
if (i % 4 == 0) {
List<Service> serviceList = Lists.newArrayList();
serviceList.add(Service.newBuilder()
.setDate(1374084640)
.setMechanic("Elon Musk").build());
carBuilder.setServiceHistory(serviceList);
}
return carBuilder.build();
}
public static class MyMapper extends Mapper<LongWritable, Text, Void, Car> {
@Override
public void run(Context context) throws IOException ,InterruptedException {
for (int i = 0; i < 10; i++) {
context.write(null, nextRecord(i));
}
}
}
public static class MyMapper2 extends Mapper<Void, Car, Void, Car> {
@Override
protected void map(Void key, Car car, Context context) throws IOException ,InterruptedException {
// Note: Car can be null because of predicate pushdown defined by an UnboundedRecordFilter (see below)
if (car != null) {
context.write(null, car);
}
}
}
public static class MyMapperShort extends
Mapper<Void, ShortCar, Void, ShortCar> {
@Override
protected void map(Void key, ShortCar car, Context context)
throws IOException, InterruptedException {
// Note: Car can be null because of predicate pushdown defined by an
// UnboundedRecordFilter (see below)
if (car != null) {
context.write(null, car);
}
}
}
public static class ElectricCarFilter implements UnboundRecordFilter {
private final UnboundRecordFilter filter;
public ElectricCarFilter() {
filter = ColumnRecordFilter.column("engine.type", ColumnPredicates.equalTo(EngineType.ELECTRIC));
}
@Override
public RecordFilter bind(Iterable<ColumnReader> readers) {
return filter.bind(readers);
}
}
final Configuration conf = new Configuration();
final Path inputPath = new Path("src/test/java/org/apache/parquet/avro/TestSpecificInputOutputFormat.java");
final Path parquetPath = new Path("target/test/hadoop/TestSpecificInputOutputFormat/parquet");
final Path outputPath = new Path("target/test/hadoop/TestSpecificInputOutputFormat/out");
@Before
public void createParquetFile() throws Exception {
final FileSystem fileSystem = parquetPath.getFileSystem(conf);
fileSystem.delete(parquetPath, true);
fileSystem.delete(outputPath, true);
{
final Job job = new Job(conf, "write");
// input not really used
TextInputFormat.addInputPath(job, inputPath);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(TestSpecificInputOutputFormat.MyMapper.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(AvroParquetOutputFormat.class);
AvroParquetOutputFormat.setOutputPath(job, parquetPath);
AvroParquetOutputFormat.setSchema(job, Car.SCHEMA$);
waitForJob(job);
}
}
@Test
public void testReadWrite() throws Exception {
final Job job = new Job(conf, "read");
job.setInputFormatClass(AvroParquetInputFormat.class);
AvroParquetInputFormat.setInputPaths(job, parquetPath);
// Test push-down predicates by using an electric car filter
AvroParquetInputFormat.setUnboundRecordFilter(job, ElectricCarFilter.class);
// Test schema projection by dropping the optional extras
Schema projection = Schema.createRecord(Car.SCHEMA$.getName(),
Car.SCHEMA$.getDoc(), Car.SCHEMA$.getNamespace(), false);
List<Schema.Field> fields = Lists.newArrayList();
for (Schema.Field field : Car.SCHEMA$.getFields()) {
if (!"optionalExtra".equals(field.name())) {
fields.add(new Schema.Field(field.name(), field.schema(), field.doc(),
field.defaultVal(), field.order()));
}
}
projection.setFields(fields);
AvroParquetInputFormat.setRequestedProjection(job, projection);
job.setMapperClass(TestSpecificInputOutputFormat.MyMapper2.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(AvroParquetOutputFormat.class);
AvroParquetOutputFormat.setOutputPath(job, outputPath);
AvroParquetOutputFormat.setSchema(job, Car.SCHEMA$);
waitForJob(job);
final Path mapperOutput = new Path(outputPath.toString(),
"part-m-00000.parquet");
final AvroParquetReader<Car> out = new AvroParquetReader<Car>(mapperOutput);
Car car;
Car previousCar = null;
int lineNumber = 0;
while ((car = out.read()) != null) {
if (previousCar != null) {
// Testing reference equality here. The "model" field should be dictionary-encoded.
assertTrue(car.getModel() == previousCar.getModel());
}
// Make sure that predicate push down worked as expected
if (car.getEngine().getType() == EngineType.PETROL) {
fail("UnboundRecordFilter failed to remove cars with PETROL engines");
}
// Note we use lineNumber * 2 because of predicate push down
Car expectedCar = nextRecord(lineNumber * 2);
// We removed the optional extra field using projection so we shouldn't
// see it here...
expectedCar.setOptionalExtra(null);
assertEquals("line " + lineNumber, expectedCar, car);
++lineNumber;
previousCar = car;
}
out.close();
}
@Test
public void testReadWriteChangedCar() throws Exception {
final Job job = new Job(conf, "read changed/short");
job.setInputFormatClass(AvroParquetInputFormat.class);
AvroParquetInputFormat.setInputPaths(job, parquetPath);
// Test push-down predicates by using an electric car filter
AvroParquetInputFormat.setUnboundRecordFilter(job, ElectricCarFilter.class);
// Test schema projection by dropping the engine, year, and vin (like ShortCar),
// but making make optional (unlike ShortCar)
Schema projection = Schema.createRecord(Car.SCHEMA$.getName(),
Car.SCHEMA$.getDoc(), Car.SCHEMA$.getNamespace(), false);
List<Schema.Field> fields = Lists.newArrayList();
for (Schema.Field field : Car.SCHEMA$.getFields()) {
// No make!
if ("engine".equals(field.name()) || "year".equals(field.name()) || "vin".equals(field.name())) {
fields.add(new Schema.Field(field.name(), field.schema(), field.doc(),
field.defaultVal(), field.order()));
}
}
projection.setFields(fields);
AvroParquetInputFormat.setRequestedProjection(job, projection);
AvroParquetInputFormat.setAvroReadSchema(job, ShortCar.SCHEMA$);
job.setMapperClass(TestSpecificInputOutputFormat.MyMapperShort.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(AvroParquetOutputFormat.class);
AvroParquetOutputFormat.setOutputPath(job, outputPath);
AvroParquetOutputFormat.setSchema(job, ShortCar.SCHEMA$);
waitForJob(job);
final Path mapperOutput = new Path(outputPath.toString(), "part-m-00000.parquet");
final AvroParquetReader<ShortCar> out = new AvroParquetReader<ShortCar>(mapperOutput);
ShortCar car;
int lineNumber = 0;
while ((car = out.read()) != null) {
// Make sure that predicate push down worked as expected
// Note we use lineNumber * 2 because of predicate push down
Car expectedCar = nextRecord(lineNumber * 2);
// We removed the optional extra field using projection so we shouldn't see it here...
assertNull(car.getMake());
assertEquals(car.getEngine(), expectedCar.getEngine());
assertEquals(car.getYear(), expectedCar.getYear());
assertEquals(car.getVin(), expectedCar.getVin());
++lineNumber;
}
out.close();
}
private void waitForJob(Job job) throws Exception {
job.submit();
while (!job.isComplete()) {
LOG.debug("waiting for job {}", job.getJobName());
sleep(100);
}
LOG.info("status for job {}: {}", job.getJobName(), (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
if (!job.isSuccessful()) {
throw new RuntimeException("job failed " + job.getJobName());
}
}
@After
public void deleteOutputFile() throws IOException {
final FileSystem fileSystem = parquetPath.getFileSystem(conf);
fileSystem.delete(parquetPath, true);
fileSystem.delete(outputPath, true);
}
}