blob: 5843c2dfb273a724b979dd314c0a5be1eba65fc7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.storage.thirdparty.parquet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import parquet.column.ParquetProperties;
import parquet.hadoop.api.WriteSupport;
import parquet.hadoop.metadata.CompressionCodecName;
import parquet.schema.MessageType;
import java.io.Closeable;
import java.io.IOException;
public class ParquetWriter<T> implements Closeable {
public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024;
public static final int DEFAULT_PAGE_SIZE = 1 * 1024 * 1024;
public static final CompressionCodecName DEFAULT_COMPRESSION_CODEC_NAME =
CompressionCodecName.UNCOMPRESSED;
public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true;
public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false;
public static final ParquetProperties.WriterVersion DEFAULT_WRITER_VERSION =
ParquetProperties.WriterVersion.PARQUET_1_0;
private final InternalParquetRecordWriter<T> writer;
/**
* Create a new ParquetWriter.
* (with dictionary encoding enabled and validation off)
*
* @param file the file to create
* @param writeSupport the implementation to write a record to a RecordConsumer
* @param compressionCodecName the compression codec to use
* @param blockSize the block size threshold
* @param pageSize the page size threshold
* @throws java.io.IOException
* @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, boolean, boolean)
*/
public ParquetWriter(Path file, WriteSupport<T> writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize) throws IOException {
this(file, writeSupport, compressionCodecName, blockSize, pageSize,
DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED);
}
/**
* Create a new ParquetWriter.
*
* @param file the file to create
* @param writeSupport the implementation to write a record to a RecordConsumer
* @param compressionCodecName the compression codec to use
* @param blockSize the block size threshold
* @param pageSize the page size threshold (both data and dictionary)
* @param enableDictionary to turn dictionary encoding on
* @param validating to turn on validation using the schema
* @throws java.io.IOException
* @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, int, boolean, boolean)
*/
public ParquetWriter(
Path file,
WriteSupport<T> writeSupport,
CompressionCodecName compressionCodecName,
int blockSize,
int pageSize,
boolean enableDictionary,
boolean validating) throws IOException {
this(file, writeSupport, compressionCodecName, blockSize, pageSize, pageSize, enableDictionary, validating);
}
/**
* Create a new ParquetWriter.
*
* @param file the file to create
* @param writeSupport the implementation to write a record to a RecordConsumer
* @param compressionCodecName the compression codec to use
* @param blockSize the block size threshold
* @param pageSize the page size threshold
* @param dictionaryPageSize the page size threshold for the dictionary pages
* @param enableDictionary to turn dictionary encoding on
* @param validating to turn on validation using the schema
* @throws java.io.IOException
* @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion)
*/
public ParquetWriter(
Path file,
WriteSupport<T> writeSupport,
CompressionCodecName compressionCodecName,
int blockSize,
int pageSize,
int dictionaryPageSize,
boolean enableDictionary,
boolean validating) throws IOException {
this(file, writeSupport, compressionCodecName, blockSize, pageSize,
dictionaryPageSize, enableDictionary, validating,
DEFAULT_WRITER_VERSION);
}
/**
* Create a new ParquetWriter.
*
* Directly instantiates a Hadoop {@link org.apache.hadoop.conf.Configuration} which reads
* configuration from the classpath.
*
* @param file the file to create
* @param writeSupport the implementation to write a record to a RecordConsumer
* @param compressionCodecName the compression codec to use
* @param blockSize the block size threshold
* @param pageSize the page size threshold
* @param dictionaryPageSize the page size threshold for the dictionary pages
* @param enableDictionary to turn dictionary encoding on
* @param validating to turn on validation using the schema
* @param writerVersion version of parquetWriter from {@link parquet.column.ParquetProperties.WriterVersion}
* @throws java.io.IOException
* @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion, org.apache.hadoop.conf.Configuration)
*/
public ParquetWriter(
Path file,
WriteSupport<T> writeSupport,
CompressionCodecName compressionCodecName,
int blockSize,
int pageSize,
int dictionaryPageSize,
boolean enableDictionary,
boolean validating,
ParquetProperties.WriterVersion writerVersion) throws IOException {
this(file, writeSupport, compressionCodecName, blockSize, pageSize, dictionaryPageSize, enableDictionary, validating, writerVersion, new Configuration());
}
/**
* Create a new ParquetWriter.
*
* @param file the file to create
* @param writeSupport the implementation to write a record to a RecordConsumer
* @param compressionCodecName the compression codec to use
* @param blockSize the block size threshold
* @param pageSize the page size threshold
* @param dictionaryPageSize the page size threshold for the dictionary pages
* @param enableDictionary to turn dictionary encoding on
* @param validating to turn on validation using the schema
* @param writerVersion version of parquetWriter from {@link parquet.column.ParquetProperties.WriterVersion}
* @param conf Hadoop configuration to use while accessing the filesystem
* @throws java.io.IOException
*/
public ParquetWriter(
Path file,
WriteSupport<T> writeSupport,
CompressionCodecName compressionCodecName,
int blockSize,
int pageSize,
int dictionaryPageSize,
boolean enableDictionary,
boolean validating,
ParquetProperties.WriterVersion writerVersion,
Configuration conf) throws IOException {
WriteSupport.WriteContext writeContext = writeSupport.init(conf);
MessageType schema = writeContext.getSchema();
ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file);
fileWriter.start();
CodecFactory codecFactory = new CodecFactory(conf);
CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compressionCodecName, 0);
this.writer = new InternalParquetRecordWriter<>(
fileWriter,
writeSupport,
schema,
writeContext.getExtraMetaData(),
blockSize,
pageSize,
compressor,
dictionaryPageSize,
enableDictionary,
validating,
writerVersion);
}
/**
* Create a new ParquetWriter. The default block size is 50 MB.The default
* page size is 1 MB. Default compression is no compression. Dictionary encoding is disabled.
*
* @param file the file to create
* @param writeSupport the implementation to write a record to a RecordConsumer
* @throws java.io.IOException
*/
public ParquetWriter(Path file, WriteSupport<T> writeSupport) throws IOException {
this(file, writeSupport, DEFAULT_COMPRESSION_CODEC_NAME, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
}
public void write(T object) throws IOException {
try {
writer.write(object);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
public long getEstimatedWrittenSize() throws IOException {
return this.writer.getEstimatedWrittenSize();
}
@Override
public void close() throws IOException {
try {
writer.close();
} catch (InterruptedException e) {
throw new IOException(e);
}
}}