blob: 74f949eec6e9ecb6142a032910021e1210ddfb18 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.avro;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.reflect.Nullable;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.Union;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.parquet.column.ColumnReader;
import org.apache.parquet.filter.ColumnPredicates;
import org.apache.parquet.filter.ColumnRecordFilter;
import org.apache.parquet.filter.RecordFilter;
import org.apache.parquet.filter.UnboundRecordFilter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.lang.Thread.sleep;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestReflectInputOutputFormat {
private static final Logger LOG = LoggerFactory.getLogger(TestReflectInputOutputFormat.class);
public static class Service {
private long date;
private String mechanic;
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Service service = (Service) o;
if (date != service.date) return false;
if (!mechanic.equals(service.mechanic)) return false;
return true;
}
@Override
public int hashCode() {
int result = (int) (date ^ (date >>> 32));
result = 31 * result + mechanic.hashCode();
return result;
}
}
public static enum EngineType {
DIESEL, PETROL, ELECTRIC
}
public static class Engine {
private EngineType type;
private float capacity;
private boolean hasTurboCharger;
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Engine engine = (Engine) o;
if (Float.compare(engine.capacity, capacity) != 0) return false;
if (hasTurboCharger != engine.hasTurboCharger) return false;
if (type != engine.type) return false;
return true;
}
@Override
public int hashCode() {
int result = type.hashCode();
result = 31 * result + (capacity != +0.0f ? Float.floatToIntBits(capacity) : 0);
result = 31 * result + (hasTurboCharger ? 1 : 0);
return result;
}
}
public static class Stereo extends Extra {
private String make;
private int speakers;
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Stereo stereo = (Stereo) o;
if (speakers != stereo.speakers) return false;
if (!make.equals(stereo.make)) return false;
return true;
}
@Override
public int hashCode() {
int result = make.hashCode();
result = 31 * result + speakers;
return result;
}
}
public static class LeatherTrim extends Extra {
private String colour;
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LeatherTrim that = (LeatherTrim) o;
if (!colour.equals(that.colour)) return false;
return true;
}
@Override
public int hashCode() {
return colour.hashCode();
}
}
@Union({Void.class, Stereo.class, LeatherTrim.class})
public static class Extra {}
public static class Car {
private long year;
private String registration;
private String make;
private String model;
private byte[] vin;
private int doors;
private Engine engine;
private Extra optionalExtra = null;
@Nullable
private List<Service> serviceHistory = null;
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Car car = (Car) o;
if (doors != car.doors) return false;
if (year != car.year) return false;
if (!engine.equals(car.engine)) return false;
if (!make.equals(car.make)) return false;
if (!model.equals(car.model)) return false;
if (optionalExtra != null ? !optionalExtra.equals(car.optionalExtra) : car.optionalExtra != null)
return false;
if (!registration.equals(car.registration)) return false;
if (serviceHistory != null ? !serviceHistory.equals(car.serviceHistory) : car.serviceHistory != null)
return false;
if (!Arrays.equals(vin, car.vin)) return false;
return true;
}
@Override
public int hashCode() {
int result = (int) (year ^ (year >>> 32));
result = 31 * result + registration.hashCode();
result = 31 * result + make.hashCode();
result = 31 * result + model.hashCode();
result = 31 * result + Arrays.hashCode(vin);
result = 31 * result + doors;
result = 31 * result + engine.hashCode();
result = 31 * result + (optionalExtra != null ? optionalExtra.hashCode() : 0);
result = 31 * result + (serviceHistory != null ? serviceHistory.hashCode() : 0);
return result;
}
}
public static class ShortCar {
@Nullable
private String make = null;
private Engine engine;
private long year;
private byte[] vin;
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShortCar shortCar = (ShortCar) o;
if (year != shortCar.year) return false;
if (!engine.equals(shortCar.engine)) return false;
if (make != null ? !make.equals(shortCar.make) : shortCar.make != null)
return false;
if (!Arrays.equals(vin, shortCar.vin)) return false;
return true;
}
@Override
public int hashCode() {
int result = make != null ? make.hashCode() : 0;
result = 31 * result + engine.hashCode();
result = 31 * result + (int) (year ^ (year >>> 32));
result = 31 * result + Arrays.hashCode(vin);
return result;
}
}
public static final Schema CAR_SCHEMA = ReflectData.get()//AllowNulls.INSTANCE
.getSchema(Car.class);
public static final Schema SHORT_CAR_SCHEMA = ReflectData.get()//AllowNulls.INSTANCE
.getSchema(ShortCar.class);
public static Car nextRecord(int i) {
Car car = new Car();
car.doors = 2;
car.make = "Tesla";
car.model = String.format("Model X v%d", i % 2);
car.vin = String.format("1VXBR12EXCP%06d", i).getBytes();
car.year = 2014 + i;
car.registration = "California";
LeatherTrim trim = new LeatherTrim();
trim.colour = "black";
car.optionalExtra = trim;
Engine engine = new Engine();
engine.capacity = 85.0f;
engine.type = (i % 2) == 0 ? EngineType.ELECTRIC : EngineType.PETROL;
engine.hasTurboCharger = false;
car.engine = engine;
if (i % 4 == 0) {
Service service = new Service();
service.date = 1374084640;
service.mechanic = "Elon Musk";
car.serviceHistory = Lists.newArrayList();
car.serviceHistory.add(service);
}
return car;
}
public static class MyMapper extends Mapper<LongWritable, Text, Void, Car> {
@Override
public void run(Context context) throws IOException ,InterruptedException {
for (int i = 0; i < 10; i++) {
context.write(null, nextRecord(i));
}
}
}
public static class MyMapper2 extends Mapper<Void, Car, Void, Car> {
@Override
protected void map(Void key, Car car, Context context) throws IOException ,InterruptedException {
// Note: Car can be null because of predicate pushdown defined by an UnboundedRecordFilter (see below)
if (car != null) {
context.write(null, car);
}
}
}
public static class MyMapperShort extends
Mapper<Void, ShortCar, Void, ShortCar> {
@Override
protected void map(Void key, ShortCar car, Context context)
throws IOException, InterruptedException {
// Note: Car can be null because of predicate pushdown defined by an
// UnboundedRecordFilter (see below)
if (car != null) {
context.write(null, car);
}
}
}
public static class ElectricCarFilter implements UnboundRecordFilter {
private final UnboundRecordFilter filter;
public ElectricCarFilter() {
filter = ColumnRecordFilter.column("engine.type", ColumnPredicates.equalTo(org.apache.parquet.avro.EngineType.ELECTRIC));
}
@Override
public RecordFilter bind(Iterable<ColumnReader> readers) {
return filter.bind(readers);
}
}
final Configuration conf = new Configuration();
final Path inputPath = new Path("src/test/java/org/apache/parquet/avro/TestReflectInputOutputFormat.java");
final Path parquetPath = new Path("target/test/hadoop/TestReflectInputOutputFormat/parquet");
final Path outputPath = new Path("target/test/hadoop/TestReflectInputOutputFormat/out");
@Before
public void createParquetFile() throws Exception {
// set up readers and writers not in MR
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
AvroReadSupport.setAvroDataSupplier(conf, ReflectDataSupplier.class);
AvroWriteSupport.setAvroDataSupplier(conf, ReflectDataSupplier.class);
final FileSystem fileSystem = parquetPath.getFileSystem(conf);
fileSystem.delete(parquetPath, true);
fileSystem.delete(outputPath, true);
{
final Job job = new Job(conf, "write");
// input not really used
TextInputFormat.addInputPath(job, inputPath);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(TestReflectInputOutputFormat.MyMapper.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(AvroParquetOutputFormat.class);
AvroParquetOutputFormat.setOutputPath(job, parquetPath);
AvroParquetOutputFormat.setSchema(job, CAR_SCHEMA);
AvroParquetOutputFormat.setAvroDataSupplier(job, ReflectDataSupplier.class);
waitForJob(job);
}
}
@Test
public void testReadWrite() throws Exception {
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
final Job job = new Job(conf, "read");
job.setInputFormatClass(AvroParquetInputFormat.class);
AvroParquetInputFormat.setInputPaths(job, parquetPath);
// Test push-down predicates by using an electric car filter
AvroParquetInputFormat.setUnboundRecordFilter(job, ElectricCarFilter.class);
// Test schema projection by dropping the optional extras
Schema projection = Schema.createRecord(CAR_SCHEMA.getName(),
CAR_SCHEMA.getDoc(), CAR_SCHEMA.getNamespace(), false);
List<Schema.Field> fields = Lists.newArrayList();
for (Schema.Field field : ReflectData.get().getSchema(Car.class).getFields()) {
if (!"optionalExtra".equals(field.name())) {
fields.add(new Schema.Field(field.name(), field.schema(), field.doc(),
field.defaultVal(), field.order()));
}
}
projection.setFields(fields);
AvroParquetInputFormat.setRequestedProjection(job, projection);
job.setMapperClass(TestReflectInputOutputFormat.MyMapper2.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(AvroParquetOutputFormat.class);
AvroParquetOutputFormat.setOutputPath(job, outputPath);
AvroParquetOutputFormat.setSchema(job, CAR_SCHEMA);
waitForJob(job);
final Path mapperOutput = new Path(outputPath.toString(),
"part-m-00000.parquet");
final AvroParquetReader<Car> out = new AvroParquetReader<Car>(conf, mapperOutput);
Car car;
Car previousCar = null;
int lineNumber = 0;
while ((car = out.read()) != null) {
if (previousCar != null) {
// Testing reference equality here. The "model" field should be dictionary-encoded.
assertTrue(car.model == previousCar.model);
}
// Make sure that predicate push down worked as expected
if (car.engine.type == EngineType.PETROL) {
fail("UnboundRecordFilter failed to remove cars with PETROL engines");
}
// Note we use lineNumber * 2 because of predicate push down
Car expectedCar = nextRecord(lineNumber * 2);
// We removed the optional extra field using projection so we shouldn't
// see it here...
expectedCar.optionalExtra = null;
assertEquals("line " + lineNumber, expectedCar, car);
++lineNumber;
previousCar = car;
}
out.close();
}
@Test
public void testReadWriteChangedCar() throws Exception {
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
final Job job = new Job(conf, "read changed/short");
job.setInputFormatClass(AvroParquetInputFormat.class);
AvroParquetInputFormat.setInputPaths(job, parquetPath);
// Test push-down predicates by using an electric car filter
AvroParquetInputFormat.setUnboundRecordFilter(job, ElectricCarFilter.class);
// Test schema projection by dropping the engine, year, and vin (like ShortCar),
// but making make optional (unlike ShortCar)
Schema projection = Schema.createRecord(CAR_SCHEMA.getName(),
CAR_SCHEMA.getDoc(), CAR_SCHEMA.getNamespace(), false);
List<Schema.Field> fields = Lists.newArrayList();
for (Schema.Field field : CAR_SCHEMA.getFields()) {
// No make!
if ("engine".equals(field.name()) || "year".equals(field.name()) || "vin".equals(field.name())) {
fields.add(new Schema.Field(field.name(), field.schema(), field.doc(),
field.defaultVal(), field.order()));
}
}
projection.setFields(fields);
AvroParquetInputFormat.setRequestedProjection(job, projection);
AvroParquetInputFormat.setAvroReadSchema(job, SHORT_CAR_SCHEMA);
job.setMapperClass(TestReflectInputOutputFormat.MyMapperShort.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(AvroParquetOutputFormat.class);
AvroParquetOutputFormat.setOutputPath(job, outputPath);
AvroParquetOutputFormat.setSchema(job, SHORT_CAR_SCHEMA);
waitForJob(job);
final Path mapperOutput = new Path(outputPath.toString(), "part-m-00000.parquet");
final AvroParquetReader<ShortCar> out = new AvroParquetReader<ShortCar>(conf, mapperOutput);
ShortCar car;
int lineNumber = 0;
while ((car = out.read()) != null) {
// Make sure that predicate push down worked as expected
// Note we use lineNumber * 2 because of predicate push down
Car expectedCar = nextRecord(lineNumber * 2);
// We removed the optional extra field using projection so we shouldn't see it here...
assertNull(car.make);
assertEquals(car.engine, expectedCar.engine);
assertEquals(car.year, expectedCar.year);
assertArrayEquals(car.vin, expectedCar.vin);
++lineNumber;
}
out.close();
}
private void waitForJob(Job job) throws Exception {
job.submit();
while (!job.isComplete()) {
LOG.debug("waiting for job {}", job.getJobName());
sleep(100);
}
LOG.info("status for job {}: {}", job.getJobName(), (job.isSuccessful() ? "SUCCESS" : "FAILURE"));
if (!job.isSuccessful()) {
throw new RuntimeException("job failed " + job.getJobName());
}
}
@After
public void deleteOutputFile() throws IOException {
final FileSystem fileSystem = parquetPath.getFileSystem(conf);
fileSystem.delete(parquetPath, true);
fileSystem.delete(outputPath, true);
}
}