| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tajo.storage.thirdparty.parquet; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import parquet.column.ParquetProperties; |
| import parquet.hadoop.api.WriteSupport; |
| import parquet.hadoop.metadata.CompressionCodecName; |
| import parquet.schema.MessageType; |
| |
| import java.io.Closeable; |
| import java.io.IOException; |
| |
| public class ParquetWriter<T> implements Closeable { |
| |
| public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024; |
| public static final int DEFAULT_PAGE_SIZE = 1 * 1024 * 1024; |
| public static final CompressionCodecName DEFAULT_COMPRESSION_CODEC_NAME = |
| CompressionCodecName.UNCOMPRESSED; |
| public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true; |
| public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false; |
| public static final ParquetProperties.WriterVersion DEFAULT_WRITER_VERSION = |
| ParquetProperties.WriterVersion.PARQUET_1_0; |
| |
| private final InternalParquetRecordWriter<T> writer; |
| |
| /** |
| * Create a new ParquetWriter. |
| * (with dictionary encoding enabled and validation off) |
| * |
| * @param file the file to create |
| * @param writeSupport the implementation to write a record to a RecordConsumer |
| * @param compressionCodecName the compression codec to use |
| * @param blockSize the block size threshold |
| * @param pageSize the page size threshold |
| * @throws java.io.IOException |
| * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, boolean, boolean) |
| */ |
| public ParquetWriter(Path file, WriteSupport<T> writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize) throws IOException { |
| this(file, writeSupport, compressionCodecName, blockSize, pageSize, |
| DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED); |
| } |
| |
| /** |
| * Create a new ParquetWriter. |
| * |
| * @param file the file to create |
| * @param writeSupport the implementation to write a record to a RecordConsumer |
| * @param compressionCodecName the compression codec to use |
| * @param blockSize the block size threshold |
| * @param pageSize the page size threshold (both data and dictionary) |
| * @param enableDictionary to turn dictionary encoding on |
| * @param validating to turn on validation using the schema |
| * @throws java.io.IOException |
| * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, int, boolean, boolean) |
| */ |
| public ParquetWriter( |
| Path file, |
| WriteSupport<T> writeSupport, |
| CompressionCodecName compressionCodecName, |
| int blockSize, |
| int pageSize, |
| boolean enableDictionary, |
| boolean validating) throws IOException { |
| this(file, writeSupport, compressionCodecName, blockSize, pageSize, pageSize, enableDictionary, validating); |
| } |
| |
| /** |
| * Create a new ParquetWriter. |
| * |
| * @param file the file to create |
| * @param writeSupport the implementation to write a record to a RecordConsumer |
| * @param compressionCodecName the compression codec to use |
| * @param blockSize the block size threshold |
| * @param pageSize the page size threshold |
| * @param dictionaryPageSize the page size threshold for the dictionary pages |
| * @param enableDictionary to turn dictionary encoding on |
| * @param validating to turn on validation using the schema |
| * @throws java.io.IOException |
| * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion) |
| */ |
| public ParquetWriter( |
| Path file, |
| WriteSupport<T> writeSupport, |
| CompressionCodecName compressionCodecName, |
| int blockSize, |
| int pageSize, |
| int dictionaryPageSize, |
| boolean enableDictionary, |
| boolean validating) throws IOException { |
| this(file, writeSupport, compressionCodecName, blockSize, pageSize, |
| dictionaryPageSize, enableDictionary, validating, |
| DEFAULT_WRITER_VERSION); |
| } |
| |
| /** |
| * Create a new ParquetWriter. |
| * |
| * Directly instantiates a Hadoop {@link org.apache.hadoop.conf.Configuration} which reads |
| * configuration from the classpath. |
| * |
| * @param file the file to create |
| * @param writeSupport the implementation to write a record to a RecordConsumer |
| * @param compressionCodecName the compression codec to use |
| * @param blockSize the block size threshold |
| * @param pageSize the page size threshold |
| * @param dictionaryPageSize the page size threshold for the dictionary pages |
| * @param enableDictionary to turn dictionary encoding on |
| * @param validating to turn on validation using the schema |
| * @param writerVersion version of parquetWriter from {@link parquet.column.ParquetProperties.WriterVersion} |
| * @throws java.io.IOException |
| * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion, org.apache.hadoop.conf.Configuration) |
| */ |
| public ParquetWriter( |
| Path file, |
| WriteSupport<T> writeSupport, |
| CompressionCodecName compressionCodecName, |
| int blockSize, |
| int pageSize, |
| int dictionaryPageSize, |
| boolean enableDictionary, |
| boolean validating, |
| ParquetProperties.WriterVersion writerVersion) throws IOException { |
| this(file, writeSupport, compressionCodecName, blockSize, pageSize, dictionaryPageSize, enableDictionary, validating, writerVersion, new Configuration()); |
| } |
| |
| /** |
| * Create a new ParquetWriter. |
| * |
| * @param file the file to create |
| * @param writeSupport the implementation to write a record to a RecordConsumer |
| * @param compressionCodecName the compression codec to use |
| * @param blockSize the block size threshold |
| * @param pageSize the page size threshold |
| * @param dictionaryPageSize the page size threshold for the dictionary pages |
| * @param enableDictionary to turn dictionary encoding on |
| * @param validating to turn on validation using the schema |
| * @param writerVersion version of parquetWriter from {@link parquet.column.ParquetProperties.WriterVersion} |
| * @param conf Hadoop configuration to use while accessing the filesystem |
| * @throws java.io.IOException |
| */ |
| public ParquetWriter( |
| Path file, |
| WriteSupport<T> writeSupport, |
| CompressionCodecName compressionCodecName, |
| int blockSize, |
| int pageSize, |
| int dictionaryPageSize, |
| boolean enableDictionary, |
| boolean validating, |
| ParquetProperties.WriterVersion writerVersion, |
| Configuration conf) throws IOException { |
| |
| WriteSupport.WriteContext writeContext = writeSupport.init(conf); |
| MessageType schema = writeContext.getSchema(); |
| |
| ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file); |
| fileWriter.start(); |
| |
| CodecFactory codecFactory = new CodecFactory(conf); |
| CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compressionCodecName, 0); |
| this.writer = new InternalParquetRecordWriter<>( |
| fileWriter, |
| writeSupport, |
| schema, |
| writeContext.getExtraMetaData(), |
| blockSize, |
| pageSize, |
| compressor, |
| dictionaryPageSize, |
| enableDictionary, |
| validating, |
| writerVersion); |
| } |
| |
| /** |
| * Create a new ParquetWriter. The default block size is 50 MB.The default |
| * page size is 1 MB. Default compression is no compression. Dictionary encoding is disabled. |
| * |
| * @param file the file to create |
| * @param writeSupport the implementation to write a record to a RecordConsumer |
| * @throws java.io.IOException |
| */ |
| public ParquetWriter(Path file, WriteSupport<T> writeSupport) throws IOException { |
| this(file, writeSupport, DEFAULT_COMPRESSION_CODEC_NAME, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE); |
| } |
| |
| public void write(T object) throws IOException { |
| try { |
| writer.write(object); |
| } catch (InterruptedException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| public long getEstimatedWrittenSize() throws IOException { |
| return this.writer.getEstimatedWrittenSize(); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| try { |
| writer.close(); |
| } catch (InterruptedException e) { |
| throw new IOException(e); |
| } |
| }} |