| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.parquet.avro; |
| |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Lists; |
| import com.google.common.io.Resources; |
| import java.io.File; |
| import java.io.IOException; |
| import java.math.BigDecimal; |
| import java.math.BigInteger; |
| import java.nio.ByteBuffer; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import org.apache.avro.Conversions; |
| import org.apache.avro.LogicalTypes; |
| import org.apache.avro.Schema; |
| import org.apache.avro.SchemaBuilder; |
| import org.apache.avro.generic.GenericData; |
| import org.apache.avro.generic.GenericFixed; |
| import org.apache.avro.generic.GenericRecord; |
| import org.apache.avro.generic.GenericRecordBuilder; |
| import org.apache.avro.util.Utf8; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.parquet.hadoop.ParquetReader; |
| import org.apache.parquet.hadoop.ParquetWriter; |
| import org.apache.parquet.hadoop.api.WriteSupport; |
| import org.apache.parquet.io.api.Binary; |
| import org.apache.parquet.io.api.RecordConsumer; |
| import org.apache.parquet.schema.MessageTypeParser; |
| import org.junit.Assert; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.TemporaryFolder; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| |
| import static org.apache.parquet.avro.AvroTestUtil.optional; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| |
| @RunWith(Parameterized.class) |
| public class TestReadWrite { |
| |
| @Parameterized.Parameters |
| public static Collection<Object[]> data() { |
| Object[][] data = new Object[][] { |
| { false }, // use the new converters |
| { true } }; // use the old converters |
| return Arrays.asList(data); |
| } |
| |
| private final boolean compat; |
| private final Configuration testConf = new Configuration(); |
| |
| public TestReadWrite(boolean compat) { |
| this.compat = compat; |
| this.testConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat); |
| testConf.setBoolean("parquet.avro.add-list-element-records", false); |
| testConf.setBoolean("parquet.avro.write-old-list-structure", false); |
| } |
| |
| @Test |
| public void testEmptyArray() throws Exception { |
| Schema schema = new Schema.Parser().parse( |
| Resources.getResource("array.avsc").openStream()); |
| |
| // Write a record with an empty array. |
| List<Integer> emptyArray = new ArrayList<>(); |
| |
| Path file = new Path(createTempFile().getPath()); |
| |
| try(ParquetWriter<GenericRecord> writer = AvroParquetWriter |
| .<GenericRecord>builder(file) |
| .withSchema(schema) |
| .withConf(testConf) |
| .build()) { |
| GenericData.Record record = new GenericRecordBuilder(schema) |
| .set("myarray", emptyArray).build(); |
| writer.write(record); |
| } |
| |
| try (AvroParquetReader<GenericRecord> reader = new AvroParquetReader<>(testConf, file)) { |
| GenericRecord nextRecord = reader.read(); |
| |
| assertNotNull(nextRecord); |
| assertEquals(emptyArray, nextRecord.get("myarray")); |
| } |
| } |
| |
| @Test |
| public void testEmptyMap() throws Exception { |
| Schema schema = new Schema.Parser().parse( |
| Resources.getResource("map.avsc").openStream()); |
| |
| Path file = new Path(createTempFile().getPath()); |
| ImmutableMap<String, Integer> emptyMap = new ImmutableMap.Builder<String, Integer>().build(); |
| |
| try(ParquetWriter<GenericRecord> writer = AvroParquetWriter |
| .<GenericRecord>builder(file) |
| .withSchema(schema) |
| .withConf(testConf) |
| .build()) { |
| |
| // Write a record with an empty map. |
| GenericData.Record record = new GenericRecordBuilder(schema) |
| .set("mymap", emptyMap).build(); |
| writer.write(record); |
| } |
| |
| try(AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf, file)) { |
| GenericRecord nextRecord = reader.read(); |
| |
| assertNotNull(nextRecord); |
| assertEquals(emptyMap, nextRecord.get("mymap")); |
| } |
| } |
| |
| @Test |
| public void testMapWithNulls() throws Exception { |
| Schema schema = new Schema.Parser().parse( |
| Resources.getResource("map_with_nulls.avsc").openStream()); |
| |
| Path file = new Path(createTempFile().getPath()); |
| |
| // Write a record with a null value |
| Map<CharSequence, Integer> map = new HashMap<>(); |
| map.put(str("thirty-four"), 34); |
| map.put(str("eleventy-one"), null); |
| map.put(str("one-hundred"), 100); |
| |
| try(ParquetWriter<GenericRecord> writer = AvroParquetWriter |
| .<GenericRecord>builder(file) |
| .withSchema(schema) |
| .withConf(testConf) |
| .build()) { |
| |
| GenericData.Record record = new GenericRecordBuilder(schema) |
| .set("mymap", map).build(); |
| writer.write(record); |
| } |
| |
| try(AvroParquetReader<GenericRecord> reader = new AvroParquetReader<>(testConf, file)) { |
| GenericRecord nextRecord = reader.read(); |
| |
| assertNotNull(nextRecord); |
| assertEquals(map, nextRecord.get("mymap")); |
| } |
| } |
| |
| @Test(expected=RuntimeException.class) |
| public void testMapRequiredValueWithNull() throws Exception { |
| Schema schema = Schema.createRecord("record1", null, null, false); |
| schema.setFields(Lists.newArrayList( |
| new Schema.Field("mymap", Schema.createMap(Schema.create(Schema.Type.INT)), null, null))); |
| |
| Path file = new Path(createTempFile().getPath()); |
| |
| try(ParquetWriter<GenericRecord> writer = AvroParquetWriter |
| .<GenericRecord>builder(file) |
| .withSchema(schema) |
| .withConf(testConf) |
| .build()) { |
| |
| // Write a record with a null value |
| Map<String, Integer> map = new HashMap<String, Integer>(); |
| map.put("thirty-four", 34); |
| map.put("eleventy-one", null); |
| map.put("one-hundred", 100); |
| |
| GenericData.Record record = new GenericRecordBuilder(schema) |
| .set("mymap", map).build(); |
| writer.write(record); |
| } |
| } |
| |
| @Test |
| public void testMapWithUtf8Key() throws Exception { |
| Schema schema = new Schema.Parser().parse( |
| Resources.getResource("map.avsc").openStream()); |
| |
| Path file = new Path(createTempFile().getPath()); |
| |
| try(ParquetWriter<GenericRecord> writer = AvroParquetWriter |
| .<GenericRecord>builder(file) |
| .withSchema(schema) |
| .withConf(testConf) |
| .build()) { |
| |
| // Write a record with a map with Utf8 keys. |
| GenericData.Record record = new GenericRecordBuilder(schema) |
| .set("mymap", ImmutableMap.of(new Utf8("a"), 1, new Utf8("b"), 2)) |
| .build(); |
| writer.write(record); |
| } |
| |
| try(AvroParquetReader<GenericRecord> reader = new AvroParquetReader<>(testConf, file)) { |
| GenericRecord nextRecord = reader.read(); |
| |
| assertNotNull(nextRecord); |
| assertEquals(ImmutableMap.of(str("a"), 1, str("b"), 2), nextRecord.get("mymap")); |
| } |
| } |
| |
| @Rule |
| public TemporaryFolder temp = new TemporaryFolder(); |
| |
| @Test |
| public void testDecimalValues() throws Exception { |
| Schema decimalSchema = Schema.createRecord("myrecord", null, null, false); |
| Schema decimal = LogicalTypes.decimal(9, 2).addToSchema( |
| Schema.create(Schema.Type.BYTES)); |
| decimalSchema.setFields(Collections.singletonList( |
| new Schema.Field("dec", decimal, null, null))); |
| |
| // add the decimal conversion to a generic data model |
| GenericData decimalSupport = new GenericData(); |
| decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion()); |
| |
| File file = temp.newFile("decimal.parquet"); |
| file.delete(); |
| Path path = new Path(file.toString()); |
| List<GenericRecord> expected = Lists.newArrayList(); |
| |
| try(ParquetWriter<GenericRecord> writer = AvroParquetWriter |
| .<GenericRecord>builder(path) |
| .withDataModel(decimalSupport) |
| .withSchema(decimalSchema) |
| .build()) { |
| |
| Random random = new Random(34L); |
| GenericRecordBuilder builder = new GenericRecordBuilder(decimalSchema); |
| for (int i = 0; i < 1000; i += 1) { |
| // Generating Integers between -(2^29) and (2^29 - 1) to ensure the number of digits <= 9 |
| BigDecimal dec = new BigDecimal(new BigInteger(30, random).subtract(BigInteger.valueOf(1L << 28)), 2); |
| builder.set("dec", dec); |
| |
| GenericRecord rec = builder.build(); |
| expected.add(rec); |
| writer.write(builder.build()); |
| } |
| } |
| List<GenericRecord> records = Lists.newArrayList(); |
| |
| try(ParquetReader<GenericRecord> reader = AvroParquetReader |
| .<GenericRecord>builder(path) |
| .withDataModel(decimalSupport) |
| .disableCompatibility() |
| .build()) { |
| GenericRecord rec; |
| while ((rec = reader.read()) != null) { |
| records.add(rec); |
| } |
| } |
| |
| Assert.assertTrue("dec field should be a BigDecimal instance", |
| records.get(0).get("dec") instanceof BigDecimal); |
| Assert.assertEquals("Content should match", expected, records); |
| } |
| |
| @Test |
| public void testFixedDecimalValues() throws Exception { |
| Schema decimalSchema = Schema.createRecord("myrecord", null, null, false); |
| Schema decimal = LogicalTypes.decimal(9, 2).addToSchema( |
| Schema.createFixed("dec", null, null, 4)); |
| decimalSchema.setFields(Collections.singletonList( |
| new Schema.Field("dec", decimal, null, null))); |
| |
| // add the decimal conversion to a generic data model |
| GenericData decimalSupport = new GenericData(); |
| decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion()); |
| |
| File file = temp.newFile("decimal.parquet"); |
| file.delete(); |
| Path path = new Path(file.toString()); |
| List<GenericRecord> expected = Lists.newArrayList(); |
| |
| try(ParquetWriter<GenericRecord> writer = AvroParquetWriter |
| .<GenericRecord>builder(path) |
| .withDataModel(decimalSupport) |
| .withSchema(decimalSchema) |
| .build()) { |
| |
| Random random = new Random(34L); |
| GenericRecordBuilder builder = new GenericRecordBuilder(decimalSchema); |
| for (int i = 0; i < 1000; i += 1) { |
| // Generating Integers between -(2^29) and (2^29 - 1) to ensure the number of digits <= 9 |
| BigDecimal dec = new BigDecimal(new BigInteger(30, random).subtract(BigInteger.valueOf(1L << 28)), 2); |
| builder.set("dec", dec); |
| |
| GenericRecord rec = builder.build(); |
| expected.add(rec); |
| writer.write(builder.build()); |
| } |
| } |
| List<GenericRecord> records = Lists.newArrayList(); |
| |
| try(ParquetReader<GenericRecord> reader = AvroParquetReader |
| .<GenericRecord>builder(path) |
| .withDataModel(decimalSupport) |
| .disableCompatibility() |
| .build()) { |
| GenericRecord rec; |
| while ((rec = reader.read()) != null) { |
| records.add(rec); |
| } |
| } |
| |
| Assert.assertTrue("dec field should be a BigDecimal instance", |
| records.get(0).get("dec") instanceof BigDecimal); |
| Assert.assertEquals("Content should match", expected, records); |
| } |
| |
| @Test |
| public void testAll() throws Exception { |
| Schema schema = new Schema.Parser().parse( |
| Resources.getResource("all.avsc").openStream()); |
| |
| Path file = new Path(createTempFile().getPath()); |
| List<Integer> integerArray = Arrays.asList(1, 2, 3); |
| GenericData.Record nestedRecord = new GenericRecordBuilder( |
| schema.getField("mynestedrecord").schema()) |
| .set("mynestedint", 1).build(); |
| List<Integer> emptyArray = new ArrayList<Integer>(); |
| Schema arrayOfOptionalIntegers = Schema.createArray( |
| optional(Schema.create(Schema.Type.INT))); |
| GenericData.Array<Integer> genericIntegerArrayWithNulls = |
| new GenericData.Array<Integer>( |
| arrayOfOptionalIntegers, |
| Arrays.asList(1, null, 2, null, 3)); |
| GenericFixed genericFixed = new GenericData.Fixed( |
| Schema.createFixed("fixed", null, null, 1), new byte[]{(byte) 65}); |
| ImmutableMap<String, Integer> emptyMap = new ImmutableMap.Builder<String, Integer>().build(); |
| |
| try(ParquetWriter<GenericRecord> writer = AvroParquetWriter |
| .<GenericRecord>builder(file) |
| .withSchema(schema) |
| .withConf(testConf) |
| .build()) { |
| |
| GenericData.Array<Integer> genericIntegerArray = new GenericData.Array<Integer>( |
| Schema.createArray(Schema.create(Schema.Type.INT)), integerArray); |
| |
| GenericData.Record record = new GenericRecordBuilder(schema) |
| .set("mynull", null) |
| .set("myboolean", true) |
| .set("myint", 1) |
| .set("mylong", 2L) |
| .set("myfloat", 3.1f) |
| .set("mydouble", 4.1) |
| .set("mybytes", ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8))) |
| .set("mystring", "hello") |
| .set("mynestedrecord", nestedRecord) |
| .set("myenum", "a") |
| .set("myarray", genericIntegerArray) |
| .set("myemptyarray", emptyArray) |
| .set("myoptionalarray", genericIntegerArray) |
| .set("myarrayofoptional", genericIntegerArrayWithNulls) |
| .set("mymap", ImmutableMap.of("a", 1, "b", 2)) |
| .set("myemptymap", emptyMap) |
| .set("myfixed", genericFixed) |
| .build(); |
| |
| writer.write(record); |
| } |
| |
| final GenericRecord nextRecord; |
| try(AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(testConf, file)) { |
| nextRecord = reader.read(); |
| } |
| |
| Object expectedEnumSymbol = compat ? "a" : |
| new GenericData.EnumSymbol(schema.getField("myenum").schema(), "a"); |
| |
| assertNotNull(nextRecord); |
| assertEquals(null, nextRecord.get("mynull")); |
| assertEquals(true, nextRecord.get("myboolean")); |
| assertEquals(1, nextRecord.get("myint")); |
| assertEquals(2L, nextRecord.get("mylong")); |
| assertEquals(3.1f, nextRecord.get("myfloat")); |
| assertEquals(4.1, nextRecord.get("mydouble")); |
| assertEquals(ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8)), nextRecord.get("mybytes")); |
| assertEquals(str("hello"), nextRecord.get("mystring")); |
| assertEquals(expectedEnumSymbol, nextRecord.get("myenum")); |
| assertEquals(nestedRecord, nextRecord.get("mynestedrecord")); |
| assertEquals(integerArray, nextRecord.get("myarray")); |
| assertEquals(emptyArray, nextRecord.get("myemptyarray")); |
| assertEquals(integerArray, nextRecord.get("myoptionalarray")); |
| assertEquals(genericIntegerArrayWithNulls, nextRecord.get("myarrayofoptional")); |
| assertEquals(ImmutableMap.of(str("a"), 1, str("b"), 2), nextRecord.get("mymap")); |
| assertEquals(emptyMap, nextRecord.get("myemptymap")); |
| assertEquals(genericFixed, nextRecord.get("myfixed")); |
| } |
| |
| @Test |
| public void testAllUsingDefaultAvroSchema() throws Exception { |
| Path file = new Path(createTempFile().getPath()); |
| |
| // write file using Parquet APIs |
| try(ParquetWriter<Map<String, Object>> parquetWriter = new ParquetWriter<>(file, |
| new WriteSupport<Map<String, Object>>() { |
| |
| private RecordConsumer recordConsumer; |
| |
| @Override |
| public WriteContext init(Configuration configuration) { |
| return new WriteContext(MessageTypeParser.parseMessageType(TestAvroSchemaConverter.ALL_PARQUET_SCHEMA), |
| new HashMap<String, String>()); |
| } |
| |
| @Override |
| public void prepareForWrite(RecordConsumer recordConsumer) { |
| this.recordConsumer = recordConsumer; |
| } |
| |
| @Override |
| public void write(Map<String, Object> record) { |
| recordConsumer.startMessage(); |
| |
| int index = 0; |
| |
| recordConsumer.startField("myboolean", index); |
| recordConsumer.addBoolean((Boolean) record.get("myboolean")); |
| recordConsumer.endField("myboolean", index++); |
| |
| recordConsumer.startField("myint", index); |
| recordConsumer.addInteger((Integer) record.get("myint")); |
| recordConsumer.endField("myint", index++); |
| |
| recordConsumer.startField("mylong", index); |
| recordConsumer.addLong((Long) record.get("mylong")); |
| recordConsumer.endField("mylong", index++); |
| |
| recordConsumer.startField("myfloat", index); |
| recordConsumer.addFloat((Float) record.get("myfloat")); |
| recordConsumer.endField("myfloat", index++); |
| |
| recordConsumer.startField("mydouble", index); |
| recordConsumer.addDouble((Double) record.get("mydouble")); |
| recordConsumer.endField("mydouble", index++); |
| |
| recordConsumer.startField("mybytes", index); |
| recordConsumer.addBinary( |
| Binary.fromReusedByteBuffer((ByteBuffer) record.get("mybytes"))); |
| recordConsumer.endField("mybytes", index++); |
| |
| recordConsumer.startField("mystring", index); |
| recordConsumer.addBinary(Binary.fromString((String) record.get("mystring"))); |
| recordConsumer.endField("mystring", index++); |
| |
| recordConsumer.startField("mynestedrecord", index); |
| recordConsumer.startGroup(); |
| recordConsumer.startField("mynestedint", 0); |
| recordConsumer.addInteger((Integer) record.get("mynestedint")); |
| recordConsumer.endField("mynestedint", 0); |
| recordConsumer.endGroup(); |
| recordConsumer.endField("mynestedrecord", index++); |
| |
| recordConsumer.startField("myenum", index); |
| recordConsumer.addBinary(Binary.fromString((String) record.get("myenum"))); |
| recordConsumer.endField("myenum", index++); |
| |
| recordConsumer.startField("myarray", index); |
| recordConsumer.startGroup(); |
| recordConsumer.startField("array", 0); |
| for (int val : (int[]) record.get("myarray")) { |
| recordConsumer.addInteger(val); |
| } |
| recordConsumer.endField("array", 0); |
| recordConsumer.endGroup(); |
| recordConsumer.endField("myarray", index++); |
| |
| recordConsumer.startField("myoptionalarray", index); |
| recordConsumer.startGroup(); |
| recordConsumer.startField("array", 0); |
| for (int val : (int[]) record.get("myoptionalarray")) { |
| recordConsumer.addInteger(val); |
| } |
| recordConsumer.endField("array", 0); |
| recordConsumer.endGroup(); |
| recordConsumer.endField("myoptionalarray", index++); |
| |
| recordConsumer.startField("myarrayofoptional", index); |
| recordConsumer.startGroup(); |
| recordConsumer.startField("list", 0); |
| for (Integer val : (Integer[]) record.get("myarrayofoptional")) { |
| recordConsumer.startGroup(); |
| if (val != null) { |
| recordConsumer.startField("element", 0); |
| recordConsumer.addInteger(val); |
| recordConsumer.endField("element", 0); |
| } |
| recordConsumer.endGroup(); |
| } |
| recordConsumer.endField("list", 0); |
| recordConsumer.endGroup(); |
| recordConsumer.endField("myarrayofoptional", index++); |
| |
| recordConsumer.startField("myrecordarray", index); |
| recordConsumer.startGroup(); |
| recordConsumer.startField("array", 0); |
| recordConsumer.startGroup(); |
| recordConsumer.startField("a", 0); |
| for (int val : (int[]) record.get("myrecordarraya")) { |
| recordConsumer.addInteger(val); |
| } |
| recordConsumer.endField("a", 0); |
| recordConsumer.startField("b", 1); |
| for (int val : (int[]) record.get("myrecordarrayb")) { |
| recordConsumer.addInteger(val); |
| } |
| recordConsumer.endField("b", 1); |
| recordConsumer.endGroup(); |
| recordConsumer.endField("array", 0); |
| recordConsumer.endGroup(); |
| recordConsumer.endField("myrecordarray", index++); |
| |
| recordConsumer.startField("mymap", index); |
| recordConsumer.startGroup(); |
| recordConsumer.startField("key_value", 0); |
| recordConsumer.startGroup(); |
| Map<String, Integer> mymap = (Map<String, Integer>) record.get("mymap"); |
| recordConsumer.startField("key", 0); |
| for (String key : mymap.keySet()) { |
| recordConsumer.addBinary(Binary.fromString(key)); |
| } |
| recordConsumer.endField("key", 0); |
| recordConsumer.startField("value", 1); |
| for (int val : mymap.values()) { |
| recordConsumer.addInteger(val); |
| } |
| recordConsumer.endField("value", 1); |
| recordConsumer.endGroup(); |
| recordConsumer.endField("key_value", 0); |
| recordConsumer.endGroup(); |
| recordConsumer.endField("mymap", index++); |
| |
| recordConsumer.startField("myfixed", index); |
| recordConsumer.addBinary(Binary.fromReusedByteArray((byte[]) record.get("myfixed"))); |
| recordConsumer.endField("myfixed", index++); |
| |
| recordConsumer.endMessage(); |
| } |
| })) { |
| Map<String, Object> record = new HashMap<String, Object>(); |
| record.put("myboolean", true); |
| record.put("myint", 1); |
| record.put("mylong", 2L); |
| record.put("myfloat", 3.1f); |
| record.put("mydouble", 4.1); |
| record.put("mybytes", ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8))); |
| record.put("mystring", "hello"); |
| record.put("myenum", "a"); |
| record.put("mynestedint", 1); |
| record.put("myarray", new int[]{1, 2, 3}); |
| record.put("myoptionalarray", new int[]{1, 2, 3}); |
| record.put("myarrayofoptional", new Integer[]{1, null, 2, null, 3}); |
| record.put("myrecordarraya", new int[]{1, 2, 3}); |
| record.put("myrecordarrayb", new int[]{4, 5, 6}); |
| record.put("mymap", ImmutableMap.of("a", 1, "b", 2)); |
| record.put("myfixed", new byte[]{(byte) 65}); |
| parquetWriter.write(record); |
| } |
| |
| Schema nestedRecordSchema = Schema.createRecord("mynestedrecord", null, null, false); |
| nestedRecordSchema.setFields(Arrays.asList( |
| new Schema.Field("mynestedint", Schema.create(Schema.Type.INT), null, null) |
| )); |
| GenericData.Record nestedRecord = new GenericRecordBuilder(nestedRecordSchema) |
| .set("mynestedint", 1).build(); |
| |
| List<Integer> integerArray = Arrays.asList(1, 2, 3); |
| List<Integer> ingeterArrayWithNulls = Arrays.asList(1, null, 2, null, 3); |
| |
| Schema recordArraySchema = Schema.createRecord("array", null, null, false); |
| recordArraySchema.setFields(Arrays.asList( |
| new Schema.Field("a", Schema.create(Schema.Type.INT), null, null), |
| new Schema.Field("b", Schema.create(Schema.Type.INT), null, null) |
| )); |
| GenericRecordBuilder builder = new GenericRecordBuilder(recordArraySchema); |
| List<GenericData.Record> recordArray = new ArrayList<GenericData.Record>(); |
| recordArray.add(builder.set("a", 1).set("b", 4).build()); |
| recordArray.add(builder.set("a", 2).set("b", 5).build()); |
| recordArray.add(builder.set("a", 3).set("b", 6).build()); |
| GenericData.Array<GenericData.Record> genericRecordArray = new GenericData.Array<GenericData.Record>( |
| Schema.createArray(recordArraySchema), recordArray); |
| |
| GenericFixed genericFixed = new GenericData.Fixed( |
| Schema.createFixed("fixed", null, null, 1), new byte[] { (byte) 65 }); |
| |
| try(AvroParquetReader<GenericRecord> reader = new AvroParquetReader<>(testConf, file)) { |
| GenericRecord nextRecord = reader.read(); |
| assertNotNull(nextRecord); |
| assertEquals(true, nextRecord.get("myboolean")); |
| assertEquals(1, nextRecord.get("myint")); |
| assertEquals(2L, nextRecord.get("mylong")); |
| assertEquals(3.1f, nextRecord.get("myfloat")); |
| assertEquals(4.1, nextRecord.get("mydouble")); |
| assertEquals(ByteBuffer.wrap("hello".getBytes(StandardCharsets.UTF_8)), nextRecord.get("mybytes")); |
| assertEquals(str("hello"), nextRecord.get("mystring")); |
| assertEquals(str("a"), nextRecord.get("myenum")); // enum symbols are unknown |
| assertEquals(nestedRecord, nextRecord.get("mynestedrecord")); |
| assertEquals(integerArray, nextRecord.get("myarray")); |
| assertEquals(integerArray, nextRecord.get("myoptionalarray")); |
| assertEquals(ingeterArrayWithNulls, nextRecord.get("myarrayofoptional")); |
| assertEquals(genericRecordArray, nextRecord.get("myrecordarray")); |
| assertEquals(ImmutableMap.of(str("a"), 1, str("b"), 2), nextRecord.get("mymap")); |
| assertEquals(genericFixed, nextRecord.get("myfixed")); |
| } |
| } |
| |
| @Test |
| public void testUnionWithSingleNonNullType() throws Exception { |
| Schema avroSchema = Schema.createRecord("SingleStringUnionRecord", null, null, false); |
| avroSchema.setFields( |
| Collections.singletonList(new Schema.Field("value", |
| Schema.createUnion(Schema.create(Schema.Type.STRING)), null, null))); |
| |
| Path file = new Path(createTempFile().getPath()); |
| |
| // Parquet writer |
| try(ParquetWriter parquetWriter = AvroParquetWriter.builder(file).withSchema(avroSchema) |
| .withConf(new Configuration()) |
| .build()) { |
| |
| GenericRecord record = new GenericRecordBuilder(avroSchema) |
| .set("value", "theValue") |
| .build(); |
| |
| parquetWriter.write(record); |
| } |
| |
| try(AvroParquetReader<GenericRecord> reader = new AvroParquetReader<>(testConf, file)) { |
| GenericRecord nextRecord = reader.read(); |
| |
| assertNotNull(nextRecord); |
| assertEquals(str("theValue"), nextRecord.get("value")); |
| } |
| } |
| |
| @Test |
| public void testDuplicatedValuesWithDictionary() throws Exception { |
| Schema schema = SchemaBuilder.record("spark_schema") |
| .fields().optionalBytes("value").endRecord(); |
| |
| Path file = new Path(createTempFile().getPath()); |
| |
| String[] records = {"one", "two", "three", "three", "two", "one", "zero"}; |
| try (ParquetWriter<GenericData.Record> writer = AvroParquetWriter |
| .<GenericData.Record>builder(file) |
| .withSchema(schema) |
| .withConf(testConf) |
| .build()) { |
| for (String record : records) { |
| writer.write(new GenericRecordBuilder(schema) |
| .set("value", record.getBytes()).build()); |
| } |
| } |
| |
| try (ParquetReader<GenericRecord> reader = AvroParquetReader |
| .<GenericRecord>builder(file) |
| .withConf(testConf).build()) { |
| GenericRecord rec; |
| int i = 0; |
| while ((rec = reader.read()) != null) { |
| ByteBuffer buf = (ByteBuffer) rec.get("value"); |
| byte[] bytes = new byte[buf.remaining()]; |
| buf.get(bytes); |
| assertEquals(records[i++], new String(bytes)); |
| } |
| } |
| } |
| |
| @Test |
| public void testNestedLists() throws Exception { |
| Schema schema = new Schema.Parser().parse( |
| Resources.getResource("nested_array.avsc").openStream()); |
| Path file = new Path(createTempFile().getPath()); |
| |
| // Parquet writer |
| ParquetWriter parquetWriter = AvroParquetWriter.builder(file).withSchema(schema) |
| .withConf(testConf) |
| .build(); |
| |
| Schema innerRecordSchema = schema.getField("l1").schema().getTypes() |
| .get(1).getElementType().getTypes().get(1); |
| |
| GenericRecord record = new GenericRecordBuilder(schema) |
| .set("l1", Collections.singletonList( |
| new GenericRecordBuilder(innerRecordSchema).set("l2", Collections.singletonList("hello")).build() |
| )) |
| .build(); |
| |
| parquetWriter.write(record); |
| parquetWriter.close(); |
| |
| AvroParquetReader<GenericRecord> reader = new AvroParquetReader(testConf, file); |
| GenericRecord nextRecord = reader.read(); |
| |
| assertNotNull(nextRecord); |
| assertNotNull(nextRecord.get("l1")); |
| List l1List = (List) nextRecord.get("l1"); |
| assertNotNull(l1List.get(0)); |
| List l2List = (List) ((GenericRecord) l1List.get(0)).get("l2"); |
| assertEquals(str("hello"), l2List.get(0)); |
| } |
| |
| /** |
| * A test demonstrating the most simple way to write and read Parquet files |
| * using Avro {@link GenericRecord}. |
| */ |
| @Test |
| public void testSimpleGeneric() throws IOException { |
| final Schema schema = |
| Schema.createRecord("Person", null, "org.apache.parquet", false); |
| schema.setFields(Arrays.asList( |
| new Schema.Field("name", Schema.create(Schema.Type.STRING), null, null), |
| new Schema.Field("weight", Schema.create(Schema.Type.INT), null, |
| null))); |
| |
| final Path file = new Path(createTempFile().getPath()); |
| |
| try (final ParquetWriter<GenericData.Record> parquetWriter = |
| AvroParquetWriter.<GenericData.Record> builder(file).withSchema(schema) |
| .build()) { |
| |
| final GenericData.Record fooRecord = new GenericData.Record(schema); |
| fooRecord.put("name", "foo"); |
| fooRecord.put("weight", 123); |
| |
| final GenericData.Record oofRecord = new GenericData.Record(schema); |
| oofRecord.put("name", "oof"); |
| oofRecord.put("weight", 321); |
| |
| parquetWriter.write(fooRecord); |
| parquetWriter.write(oofRecord); |
| } |
| |
| // Read the file. String data is returned as org.apache.avro.util.Utf8 so it |
| // must be converting to a String before checking equality |
| try (ParquetReader<GenericRecord> reader = |
| AvroParquetReader.genericRecordReader(file)) { |
| |
| final GenericRecord r1 = reader.read(); |
| assertEquals("foo", r1.get("name").toString()); |
| assertEquals(123, r1.get("weight")); |
| |
| final GenericRecord r2 = reader.read(); |
| assertEquals("oof", r2.get("name").toString()); |
| assertEquals(321, r2.get("weight")); |
| } |
| } |
| |
| private File createTempFile() throws IOException { |
| File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); |
| tmp.deleteOnExit(); |
| tmp.delete(); |
| return tmp; |
| } |
| |
| /** |
| * Return a String or Utf8 depending on whether compatibility is on |
| */ |
| public CharSequence str(String value) { |
| return compat ? value : new Utf8(value); |
| } |
| } |