| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.parquet.hadoop.thrift; |
| |
| import static org.junit.Assert.assertEquals; |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.parquet.column.statistics.*; |
| import org.apache.parquet.hadoop.api.InitContext; |
| import org.apache.parquet.hadoop.metadata.BlockMetaData; |
| import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; |
| import org.apache.parquet.hadoop.util.ContextUtil; |
| import org.apache.parquet.io.api.Binary; |
| import org.apache.parquet.thrift.ParquetWriteProtocol; |
| import org.apache.parquet.thrift.test.RequiredPrimitiveFixture; |
| import org.apache.parquet.thrift.test.TestListsInMap; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.BytesWritable; |
| import org.apache.hadoop.mapreduce.TaskAttemptID; |
| import org.apache.thrift.TBase; |
| import org.apache.thrift.TException; |
| import org.apache.thrift.protocol.TCompactProtocol; |
| import org.apache.thrift.protocol.TProtocol; |
| import org.apache.thrift.protocol.TProtocolFactory; |
| import org.apache.thrift.transport.TIOStreamTransport; |
| import org.junit.Test; |
| |
| import org.apache.parquet.example.data.Group; |
| import org.apache.parquet.hadoop.ParquetFileReader; |
| import org.apache.parquet.hadoop.ParquetReader; |
| import org.apache.parquet.hadoop.TestUtils; |
| import org.apache.parquet.hadoop.example.GroupReadSupport; |
| import org.apache.parquet.hadoop.metadata.ParquetMetadata; |
| |
| import com.twitter.data.proto.tutorial.thrift.AddressBook; |
| import com.twitter.data.proto.tutorial.thrift.Name; |
| import com.twitter.data.proto.tutorial.thrift.Person; |
| import com.twitter.data.proto.tutorial.thrift.PhoneNumber; |
| import com.twitter.elephantbird.thrift.test.TestListInMap; |
| import com.twitter.elephantbird.thrift.test.TestMapInList; |
| |
| import org.apache.parquet.schema.MessageType; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class TestThriftToParquetFileWriter { |
| private static final Logger LOG = LoggerFactory.getLogger(TestThriftToParquetFileWriter.class); |
| |
| @Test |
| public void testWriteFile() throws IOException, InterruptedException, TException { |
| final AddressBook a = new AddressBook( |
| Arrays.asList( |
| new Person( |
| new Name("Bob", "Roberts"), |
| 0, |
| "bob.roberts@example.com", |
| Arrays.asList(new PhoneNumber("1234567890"))))); |
| |
| final Path fileToCreate = createFile(new Configuration(), a); |
| |
| ParquetReader<Group> reader = createRecordReader(fileToCreate); |
| |
| Group g = null; |
| int i = 0; |
| while((g = reader.read()) != null) { |
| assertEquals(a.persons.size(), g.getFieldRepetitionCount("persons")); |
| assertEquals(a.persons.get(0).email, g.getGroup("persons", 0).getGroup(0, 0).getString("email", 0)); |
| // just some sanity check, we're testing the various layers somewhere else |
| ++i; |
| } |
| assertEquals("read 1 record", 1, i); |
| |
| } |
| @Test |
| public void testWriteStatistics() throws Exception { |
| //create correct stats small numbers |
| IntStatistics intStatsSmall = new IntStatistics(); |
| intStatsSmall.setMinMax(2, 100); |
| LongStatistics longStatsSmall = new LongStatistics(); |
| longStatsSmall.setMinMax(-17l, 287L); |
| DoubleStatistics doubleStatsSmall = new DoubleStatistics(); |
| doubleStatsSmall.setMinMax(-15.55d, 9.63d); |
| BinaryStatistics binaryStatsSmall = new BinaryStatistics(); |
| binaryStatsSmall.setMinMax(Binary.fromString("as"), Binary.fromString("world")); |
| BooleanStatistics boolStats = new BooleanStatistics(); |
| boolStats.setMinMax(false, true); |
| |
| //write rows to a file |
| Path p = createFile(new Configuration(), |
| new RequiredPrimitiveFixture(false, (byte)32, (short)32, 2, 90l, -15.55d, "as"), |
| new RequiredPrimitiveFixture(false, (byte)100, (short)100, 100, 287l, -9.0d, "world"), |
| new RequiredPrimitiveFixture(true, (byte)2, (short)2, 9, -17l, 9.63d, "hello")); |
| final Configuration configuration = new Configuration(); |
| configuration.setBoolean("parquet.strings.signed-min-max.enabled", true); |
| final FileSystem fs = p.getFileSystem(configuration); |
| FileStatus fileStatus = fs.getFileStatus(p); |
| ParquetMetadata footer = ParquetFileReader.readFooter(configuration, p); |
| for(BlockMetaData bmd: footer.getBlocks()) { |
| for(ColumnChunkMetaData cmd: bmd.getColumns()) { |
| switch(cmd.getType()) { |
| case INT32: |
| TestUtils.assertStatsValuesEqual(intStatsSmall, cmd.getStatistics()); |
| break; |
| case INT64: |
| TestUtils.assertStatsValuesEqual(longStatsSmall, cmd.getStatistics()); |
| break; |
| case DOUBLE: |
| TestUtils.assertStatsValuesEqual(doubleStatsSmall, cmd.getStatistics()); |
| break; |
| case BOOLEAN: |
| TestUtils.assertStatsValuesEqual(boolStats, cmd.getStatistics()); |
| break; |
| case BINARY: |
| // there is also info_string that has no statistics |
| if(cmd.getPath().toString() == "[test_string]") |
| TestUtils.assertStatsValuesEqual(binaryStatsSmall, cmd.getStatistics()); |
| break; |
| } |
| } |
| } |
| //create correct stats large numbers |
| IntStatistics intStatsLarge = new IntStatistics(); |
| intStatsLarge.setMinMax(-Integer.MAX_VALUE, Integer.MAX_VALUE); |
| LongStatistics longStatsLarge = new LongStatistics(); |
| longStatsLarge.setMinMax(-Long.MAX_VALUE, Long.MAX_VALUE); |
| DoubleStatistics doubleStatsLarge = new DoubleStatistics(); |
| doubleStatsLarge.setMinMax(-Double.MAX_VALUE, Double.MAX_VALUE); |
| BinaryStatistics binaryStatsLarge = new BinaryStatistics(); |
| binaryStatsLarge.setMinMax(Binary.fromString("some small string"), |
| Binary.fromString("some very large string here to test in this function")); |
| //write rows to a file |
| Path p_large = createFile(new Configuration(), |
| new RequiredPrimitiveFixture(false, (byte)2, (short)32, -Integer.MAX_VALUE, |
| -Long.MAX_VALUE, -Double.MAX_VALUE, "some small string"), |
| new RequiredPrimitiveFixture(false, (byte)100, (short)100, Integer.MAX_VALUE, |
| Long.MAX_VALUE, Double.MAX_VALUE, |
| "some very large string here to test in this function"), |
| new RequiredPrimitiveFixture(true, (byte)2, (short)2, 9, -17l, 9.63d, "hello")); |
| |
| // make new configuration and create file with new large stats |
| final Configuration configuration_large = new Configuration(); |
| configuration.setBoolean("parquet.strings.signed-min-max.enabled", true); |
| final FileSystem fs_large = p_large.getFileSystem(configuration_large); |
| FileStatus fileStatus_large = fs_large.getFileStatus(p_large); |
| ParquetMetadata footer_large = ParquetFileReader.readFooter(configuration_large, p_large); |
| for(BlockMetaData bmd: footer_large.getBlocks()) { |
| for(ColumnChunkMetaData cmd: bmd.getColumns()) { |
| switch(cmd.getType()) { |
| case INT32: |
| // testing the correct limits of an int32, there are also byte and short, tested earlier |
| if(cmd.getPath().toString() == "[test_i32]") |
| TestUtils.assertStatsValuesEqual(intStatsLarge, cmd.getStatistics()); |
| break; |
| case INT64: |
| TestUtils.assertStatsValuesEqual(longStatsLarge, cmd.getStatistics()); |
| break; |
| case DOUBLE: |
| TestUtils.assertStatsValuesEqual(doubleStatsLarge, cmd.getStatistics()); |
| break; |
| case BOOLEAN: |
| TestUtils.assertStatsValuesEqual(boolStats, cmd.getStatistics()); |
| break; |
| case BINARY: |
| // there is also info_string that has no statistics |
| if(cmd.getPath().toString() == "[test_string]") |
| TestUtils.assertStatsValuesEqual(binaryStatsLarge, cmd.getStatistics()); |
| break; |
| } |
| } |
| } |
| } |
| |
| @Test |
| public void testWriteFileListOfMap() throws IOException, InterruptedException, TException { |
| Map<String, String> map1 = new HashMap<String,String>(); |
| map1.put("key11", "value11"); |
| map1.put("key12", "value12"); |
| Map<String, String> map2 = new HashMap<String,String>(); |
| map2.put("key21", "value21"); |
| final TestMapInList listMap = new TestMapInList("listmap", |
| Arrays.asList(map1, map2)); |
| |
| final Path fileToCreate = createFile(new Configuration(), listMap); |
| |
| ParquetReader<Group> reader = createRecordReader(fileToCreate); |
| |
| Group g = null; |
| while((g = reader.read()) != null) { |
| assertEquals(listMap.names.size(), |
| g.getGroup("names", 0).getFieldRepetitionCount("names_tuple")); |
| assertEquals(listMap.names.get(0).size(), |
| g.getGroup("names", 0).getGroup("names_tuple", 0).getFieldRepetitionCount("key_value")); |
| assertEquals(listMap.names.get(1).size(), |
| g.getGroup("names", 0).getGroup("names_tuple", 1).getFieldRepetitionCount("key_value")); |
| } |
| } |
| |
| @Test |
| public void testWriteFileMapOfList() throws IOException, InterruptedException, TException { |
| Map<String, List<String>> map = new HashMap<String,List<String>>(); |
| map.put("key", Arrays.asList("val1","val2")); |
| final TestListInMap mapList = new TestListInMap("maplist", map); |
| final Path fileToCreate = createFile(new Configuration(), mapList); |
| |
| ParquetReader<Group> reader = createRecordReader(fileToCreate); |
| |
| Group g = null; |
| while((g = reader.read()) != null) { |
| assertEquals("key", |
| g.getGroup("names", 0).getGroup("key_value",0).getBinary("key", 0).toStringUsingUTF8()); |
| assertEquals(map.get("key").size(), |
| g.getGroup("names", 0).getGroup("key_value",0).getGroup("value", 0).getFieldRepetitionCount(0)); |
| } |
| } |
| |
| @Test |
| public void testWriteFileMapOfLists() throws IOException, InterruptedException, TException { |
| Map<List<String>, List<String>> map = new HashMap<List<String>,List<String>>(); |
| map.put(Arrays.asList("key1","key2"), Arrays.asList("val1","val2")); |
| final TestListsInMap mapList = new TestListsInMap("maplists", map); |
| final Path fileToCreate = createFile(new Configuration(), mapList); |
| |
| ParquetReader<Group> reader = createRecordReader(fileToCreate); |
| |
| Group g = null; |
| while((g = reader.read()) != null) { |
| assertEquals("key1", |
| g.getGroup("names", 0).getGroup("key_value",0).getGroup("key", 0).getBinary("key_tuple", 0).toStringUsingUTF8()); |
| assertEquals("key2", |
| g.getGroup("names", 0).getGroup("key_value",0).getGroup("key", 0).getBinary("key_tuple", 1).toStringUsingUTF8()); |
| assertEquals("val1", |
| g.getGroup("names", 0).getGroup("key_value",0).getGroup("value", 0).getBinary("value_tuple", 0).toStringUsingUTF8()); |
| assertEquals("val2", |
| g.getGroup("names", 0).getGroup("key_value",0).getGroup("value", 0).getBinary("value_tuple", 1).toStringUsingUTF8()); |
| } |
| } |
| |
| @Test |
| public void testWriteFileWithThreeLevelsList() |
| throws IOException, InterruptedException, TException { |
| final AddressBook a = new AddressBook( |
| Arrays.asList( |
| new Person( |
| new Name("Bob", "Roberts"), |
| 0, |
| "bob.roberts@example.com", |
| Arrays.asList(new PhoneNumber("1234567890"))))); |
| |
| Configuration conf = new Configuration(); |
| conf.set(ParquetWriteProtocol.WRITE_THREE_LEVEL_LISTS, "true"); |
| |
| final Path fileToCreate = createFile(conf, a); |
| |
| ParquetReader<Group> reader = createRecordReader(fileToCreate); |
| |
| Group g = null; |
| int i = 0; |
| while((g = reader.read()) != null) { |
| assertEquals(a.persons.size(), g.getFieldRepetitionCount("persons")); |
| assertEquals( |
| a.persons.get(0).email, |
| g.getGroup("persons", 0).getGroup(0, 0).getGroup(0, 0).getString("email", 0)); |
| // just some sanity check, we're testing the various layers somewhere else |
| ++i; |
| } |
| assertEquals("read 1 record", 1, i); |
| } |
| |
| @Test |
| public void testWriteFileListOfMapWithThreeLevelLists() |
| throws IOException, InterruptedException, TException { |
| Map<String, String> map1 = new HashMap<String,String>(); |
| map1.put("key11", "value11"); |
| map1.put("key12", "value12"); |
| Map<String, String> map2 = new HashMap<String,String>(); |
| map2.put("key21", "value21"); |
| final TestMapInList listMap = new TestMapInList("listmap", |
| Arrays.asList(map1, map2)); |
| |
| Configuration conf = new Configuration(); |
| conf.set(ParquetWriteProtocol.WRITE_THREE_LEVEL_LISTS, "true"); |
| |
| final Path fileToCreate = createFile(conf, listMap); |
| |
| ParquetReader<Group> reader = createRecordReader(fileToCreate); |
| |
| Group g = null; |
| while((g = reader.read()) != null) { |
| assertEquals(listMap.names.size(), |
| g.getGroup("names", 0).getFieldRepetitionCount("list")); |
| assertEquals(listMap.names.get(0).size(), |
| g.getGroup("names", 0).getGroup("list", 0). |
| getGroup("element", 0).getFieldRepetitionCount("key_value")); |
| assertEquals(listMap.names.get(1).size(), |
| g.getGroup("names", 0).getGroup("list", 1). |
| getGroup("element", 0).getFieldRepetitionCount("key_value")); |
| } |
| } |
| |
| private ParquetReader<Group> createRecordReader(Path parquetFilePath) throws IOException { |
| Configuration configuration = new Configuration(true); |
| |
| GroupReadSupport readSupport = new GroupReadSupport(); |
| ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, parquetFilePath); |
| MessageType schema = readFooter.getFileMetaData().getSchema(); |
| |
| readSupport.init(new InitContext(configuration, null, schema)); |
| return new ParquetReader<Group>(parquetFilePath, readSupport); |
| } |
| |
| private <T extends TBase<?,?>> Path createFile(Configuration conf, T... tObjs) |
| throws IOException, InterruptedException, TException { |
| final Path fileToCreate = new Path("target/test/TestThriftToParquetFileWriter/"+tObjs[0].getClass()+".parquet"); |
| LOG.info("File created: " + fileToCreate.toString()); |
| final FileSystem fs = fileToCreate.getFileSystem(conf); |
| if (fs.exists(fileToCreate)) { |
| fs.delete(fileToCreate, true); |
| |
| } |
| TProtocolFactory protocolFactory = new TCompactProtocol.Factory(); |
| TaskAttemptID taskId = new TaskAttemptID("local", 0, true, 0, 0); |
| ThriftToParquetFileWriter w = new ThriftToParquetFileWriter(fileToCreate, ContextUtil.newTaskAttemptContext(conf, taskId), protocolFactory, (Class<? extends TBase<?, ?>>) tObjs[0].getClass()); |
| |
| for(T tObj:tObjs) { |
| final ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
| final TProtocol protocol = protocolFactory.getProtocol(new TIOStreamTransport(baos)); |
| |
| tObj.write(protocol); |
| |
| w.write(new BytesWritable(baos.toByteArray())); |
| } |
| w.close(); |
| |
| return fileToCreate; |
| } |
| } |