blob: 24d634ed99d8e2c24c282e7351f826534abb8c3b [file] [log] [blame]
/**
* Copyright 2012 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package parquet.pig;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import parquet.Log;
import parquet.bytes.BytesInput;
import parquet.column.ColumnDescriptor;
import parquet.column.ParquetProperties.WriterVersion;
import parquet.column.impl.ColumnWriteStoreImpl;
import parquet.column.page.Page;
import parquet.column.page.PageReadStore;
import parquet.column.page.PageReader;
import parquet.column.page.mem.MemPageStore;
import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.ParquetFileWriter;
import parquet.hadoop.metadata.CompressionCodecName;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.io.ColumnIOFactory;
import parquet.io.MessageColumnIO;
import parquet.io.api.RecordConsumer;
import parquet.schema.MessageType;
import parquet.schema.PrimitiveType;
import parquet.schema.PrimitiveType.PrimitiveTypeName;
import parquet.schema.Type.Repetition;
public class GenerateIntTestFile {
private static final Log LOG = Log.getLog(GenerateIntTestFile.class);
public static void main(String[] args) throws Throwable {
File out = new File("testdata/from_java/int_test_file");
if (out.exists()) {
if (!out.delete()) {
throw new RuntimeException("can not remove existing file " + out.getAbsolutePath());
}
}
Path testFile = new Path(out.toURI());
Configuration configuration = new Configuration();
{
MessageType schema = new MessageType("int_test_file", new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.INT32, "int_col"));
MemPageStore pageStore = new MemPageStore(100);
ColumnWriteStoreImpl store = new ColumnWriteStoreImpl(pageStore, 8*1024, 8*1024, 8*1024, false, WriterVersion.PARQUET_1_0);
MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
RecordConsumer recordWriter = columnIO.getRecordWriter(store);
int recordCount = 0;
for (int i = 0; i < 100; i++) {
recordWriter.startMessage();
recordWriter.startField("int_col", 0);
if (i % 10 != 0) {
recordWriter.addInteger(i);
}
recordWriter.endField("int_col", 0);
recordWriter.endMessage();
++ recordCount;
}
store.flush();
writeToFile(testFile, configuration, schema, pageStore, recordCount);
}
{
readTestFile(testFile, configuration);
}
}
public static void readTestFile(Path testFile, Configuration configuration)
throws IOException {
ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, testFile);
MessageType schema = readFooter.getFileMetaData().getSchema();
ParquetFileReader parquetFileReader = new ParquetFileReader(configuration, testFile, readFooter.getBlocks(), schema.getColumns());
PageReadStore pages = parquetFileReader.readNextRowGroup();
System.out.println(pages.getRowCount());
}
public static void writeToFile(Path file, Configuration configuration, MessageType schema, MemPageStore pageStore, int recordCount)
throws IOException {
ParquetFileWriter w = startFile(file, configuration, schema);
writeBlock(schema, pageStore, recordCount, w);
endFile(w);
}
public static void endFile(ParquetFileWriter w) throws IOException {
w.end(new HashMap<String, String>());
}
public static void writeBlock(MessageType schema, MemPageStore pageStore,
int recordCount, ParquetFileWriter w) throws IOException {
w.startBlock(recordCount);
List<ColumnDescriptor> columns = schema.getColumns();
for (ColumnDescriptor columnDescriptor : columns) {
PageReader pageReader = pageStore.getPageReader(columnDescriptor);
long totalValueCount = pageReader.getTotalValueCount();
w.startColumn(columnDescriptor, totalValueCount, CompressionCodecName.UNCOMPRESSED);
int n = 0;
do {
Page page = pageReader.readPage();
n += page.getValueCount();
// TODO: change INTFC
w.writeDataPage(
page.getValueCount(),
(int)page.getBytes().size(),
BytesInput.from(page.getBytes().toByteArray()),
page.getRlEncoding(),
page.getDlEncoding(),
page.getValueEncoding());
} while (n < totalValueCount);
w.endColumn();
}
w.endBlock();
}
public static ParquetFileWriter startFile(Path file,
Configuration configuration, MessageType schema) throws IOException {
ParquetFileWriter w = new ParquetFileWriter(configuration, schema, file);
w.start();
return w;
}
}