blob: 390012624ddfc9464a131ba9019a3673810c71e5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.parquet;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.Schema;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
class ParquetWriter<T> implements FileAppender<T>, Closeable {
private static DynConstructors.Ctor<PageWriteStore> pageStoreCtorParquet = DynConstructors
.builder(PageWriteStore.class)
.hiddenImpl("org.apache.parquet.hadoop.ColumnChunkPageWriteStore",
CodecFactory.BytesCompressor.class,
MessageType.class,
ByteBufferAllocator.class,
int.class)
.build();
private static final DynMethods.UnboundMethod flushToWriter = DynMethods
.builder("flushToFileWriter")
.hiddenImpl("org.apache.parquet.hadoop.ColumnChunkPageWriteStore", ParquetFileWriter.class)
.build();
private final long targetRowGroupSize;
private final Map<String, String> metadata;
private final ParquetProperties props;
private final CodecFactory.BytesCompressor compressor;
private final MessageType parquetSchema;
private final ParquetValueWriter<T> model;
private final ParquetFileWriter writer;
private final MetricsConfig metricsConfig;
private final int columnIndexTruncateLength;
private DynMethods.BoundMethod flushPageStoreToWriter;
private ColumnWriteStore writeStore;
private long nextRowGroupSize = 0;
private long recordCount = 0;
private long nextCheckRecordCount = 10;
private boolean closed;
private static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
private static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
@SuppressWarnings("unchecked")
ParquetWriter(Configuration conf, OutputFile output, Schema schema, long rowGroupSize,
Map<String, String> metadata,
Function<MessageType, ParquetValueWriter<?>> createWriterFunc,
CompressionCodecName codec,
ParquetProperties properties,
MetricsConfig metricsConfig,
ParquetFileWriter.Mode writeMode) {
this.targetRowGroupSize = rowGroupSize;
this.props = properties;
this.metadata = ImmutableMap.copyOf(metadata);
this.compressor = new CodecFactory(conf, props.getPageSizeThreshold()).getCompressor(codec);
this.parquetSchema = ParquetSchemaUtil.convert(schema, "table");
this.model = (ParquetValueWriter<T>) createWriterFunc.apply(parquetSchema);
this.metricsConfig = metricsConfig;
this.columnIndexTruncateLength = conf.getInt(COLUMN_INDEX_TRUNCATE_LENGTH, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
try {
this.writer = new ParquetFileWriter(ParquetIO.file(output, conf), parquetSchema,
writeMode, rowGroupSize, 0);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to create Parquet file");
}
try {
writer.start();
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to start Parquet file writer");
}
startRowGroup();
}
@Override
public void add(T value) {
recordCount += 1;
model.write(0, value);
writeStore.endRecord();
checkSize();
}
@Override
public Metrics metrics() {
return ParquetUtil.footerMetrics(writer.getFooter(), model.metrics(), metricsConfig);
}
/**
* Returns the approximate length of the output file produced by this writer.
* <p>
* Prior to calling {@link ParquetWriter#close}, the result is approximate. After calling close, the length is
* exact.
*
* @return the approximate length of the output file produced by this writer or the exact length if this writer is
* closed.
*/
@Override
public long length() {
try {
if (closed) {
return writer.getPos();
} else {
return writer.getPos() + (writeStore.isColumnFlushNeeded() ? writeStore.getBufferedSize() : 0);
}
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to get file length");
}
}
@Override
public List<Long> splitOffsets() {
return ParquetUtil.getSplitOffsets(writer.getFooter());
}
private void checkSize() {
if (recordCount >= nextCheckRecordCount) {
long bufferedSize = writeStore.getBufferedSize();
double avgRecordSize = ((double) bufferedSize) / recordCount;
if (bufferedSize > (nextRowGroupSize - 2 * avgRecordSize)) {
flushRowGroup(false);
} else {
long remainingSpace = nextRowGroupSize - bufferedSize;
long remainingRecords = (long) (remainingSpace / avgRecordSize);
this.nextCheckRecordCount = recordCount + Math.min(Math.max(remainingRecords / 2, 100), 10000);
}
}
}
private void flushRowGroup(boolean finished) {
try {
if (recordCount > 0) {
writer.startBlock(recordCount);
writeStore.flush();
flushPageStoreToWriter.invoke(writer);
writer.endBlock();
if (!finished) {
startRowGroup();
}
}
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to flush row group");
}
}
private void startRowGroup() {
Preconditions.checkState(!closed, "Writer is closed");
try {
this.nextRowGroupSize = Math.min(writer.getNextRowGroupSize(), targetRowGroupSize);
} catch (IOException e) {
throw new RuntimeIOException(e);
}
this.nextCheckRecordCount = Math.min(Math.max(recordCount / 2, 100), 10000);
this.recordCount = 0;
PageWriteStore pageStore = pageStoreCtorParquet.newInstance(
compressor, parquetSchema, props.getAllocator(), this.columnIndexTruncateLength);
this.flushPageStoreToWriter = flushToWriter.bind(pageStore);
this.writeStore = props.newColumnWriteStore(parquetSchema, pageStore);
model.setColumnStore(writeStore);
}
@Override
public void close() throws IOException {
if (!closed) {
this.closed = true;
flushRowGroup(true);
writeStore.close();
writer.end(metadata);
}
}
}