| /** |
| * Copyright 2012 Twitter, Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package parquet.pig; |
| |
| import static parquet.pig.GenerateIntTestFile.readTestFile; |
| import static parquet.pig.GenerateIntTestFile.writeToFile; |
| |
| import java.io.File; |
| import java.io.IOException; |
| |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| |
| import parquet.Log; |
| import parquet.column.ParquetProperties.WriterVersion; |
| import parquet.column.impl.ColumnWriteStoreImpl; |
| import parquet.column.page.mem.MemPageStore; |
| import parquet.io.ColumnIOFactory; |
| import parquet.io.MessageColumnIO; |
| import parquet.io.api.Binary; |
| import parquet.io.api.RecordConsumer; |
| import parquet.schema.MessageType; |
| import parquet.schema.PrimitiveType; |
| import parquet.schema.PrimitiveType.PrimitiveTypeName; |
| import parquet.schema.Type.Repetition; |
| |
| public class GenerateTPCH { |
| private static final Log LOG = Log.getLog(GenerateTPCH.class); |
| |
| public static void main(String[] args) throws IOException { |
| File out = new File("testdata/from_java/tpch/customer"); |
| if (out.exists()) { |
| if (!out.delete()) { |
| throw new RuntimeException("can not remove existing file " + out.getAbsolutePath()); |
| } |
| } |
| Path testFile = new Path(out.toURI()); |
| Configuration configuration = new Configuration(); |
| MessageType schema = new MessageType("customer", |
| new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.INT32, "c_custkey"), |
| new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "c_name"), |
| new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "c_address"), |
| new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.INT32, "c_nationkey"), |
| new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "c_phone"), |
| new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.DOUBLE, "c_acctbal"), |
| new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "c_mktsegment"), |
| new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "c_comment") |
| ); |
| |
| MemPageStore pageStore = new MemPageStore(150000); |
| ColumnWriteStoreImpl store = new ColumnWriteStoreImpl(pageStore, 20*1024, 1*1024, 20*1024, false, WriterVersion.PARQUET_1_0); |
| MessageColumnIO columnIO = new ColumnIOFactory(true).getColumnIO(schema); |
| |
| RecordConsumer recordWriter = columnIO.getRecordWriter(store); |
| |
| int recordCount = 0; |
| for (int i = 0; i < 150000; i++) { |
| recordWriter.startMessage(); |
| writeField(recordWriter, 0, "c_custkey", i % 10 == 0 ? null : i); |
| writeField(recordWriter, 1, "c_name", i % 11 == 0 ? null : "name_" + i); |
| writeField(recordWriter, 2, "c_address", i % 12 == 0 ? null : "add_" + i); |
| writeField(recordWriter, 3, "c_nationkey", i % 13 == 0 ? null : i); |
| writeField(recordWriter, 4, "c_phone", i % 14 == 0 ? null : "phone_" + i); |
| writeField(recordWriter, 5, "c_acctbal", i % 15 == 0 ? null : 1.2d * i); |
| writeField(recordWriter, 6, "c_mktsegment", i % 16 == 0 ? null : "mktsegment_" + i); |
| writeField(recordWriter, 7, "c_comment", i % 17 == 0 ? null : "comment_" + i); |
| recordWriter.endMessage(); |
| ++ recordCount; |
| } |
| store.flush(); |
| System.out.printf("mem size %,d, maxColSize %,d, allocated %,d\n", store.memSize(), store.maxColMemSize(), store.allocatedSize()); |
| System.out.println(store.memUsageString()); |
| writeToFile(testFile, configuration, schema, pageStore, recordCount); |
| |
| try { |
| readTestFile(testFile, configuration); |
| } catch (Exception e) { |
| LOG.error("failed reading", e); |
| } |
| |
| } |
| |
| private static void writeField(RecordConsumer recordWriter, int index, String name, Object value) { |
| if (value != null) { |
| recordWriter.startField(name, index); |
| if (value instanceof Integer) { |
| recordWriter.addInteger((Integer)value); |
| } else if (value instanceof String) { |
| recordWriter.addBinary(Binary.fromString((String)value)); |
| } else if (value instanceof Double) { |
| recordWriter.addDouble((Double)value); |
| } else { |
| throw new IllegalArgumentException(value.getClass().getName() + " not supported"); |
| } |
| recordWriter.endField(name, index); |
| } |
| } |
| } |