| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tajo.storage.thirdparty.parquet; |
| |
| import parquet.Log; |
| import parquet.column.ParquetProperties.WriterVersion; |
| import parquet.column.impl.ColumnWriteStoreImpl; |
| import parquet.hadoop.api.WriteSupport; |
| import parquet.io.ColumnIOFactory; |
| import parquet.io.MessageColumnIO; |
| import parquet.schema.MessageType; |
| |
| import java.io.IOException; |
| import java.util.Map; |
| |
| import static java.lang.Math.max; |
| import static java.lang.Math.min; |
| import static java.lang.String.format; |
| import static org.apache.tajo.storage.thirdparty.parquet.CodecFactory.BytesCompressor; |
| import static parquet.Log.DEBUG; |
| import static parquet.Preconditions.checkNotNull; |
| |
| class InternalParquetRecordWriter<T> { |
| private static final Log LOG = Log.getLog(InternalParquetRecordWriter.class); |
| |
| private static final int MINIMUM_BUFFER_SIZE = 64 * 1024; |
| private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100; |
| private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000; |
| |
| private final ParquetFileWriter w; |
| private final WriteSupport<T> writeSupport; |
| private final MessageType schema; |
| private final Map<String, String> extraMetaData; |
| private final int blockSize; |
| private final int pageSize; |
| private final BytesCompressor compressor; |
| private final int dictionaryPageSize; |
| private final boolean enableDictionary; |
| private final boolean validating; |
| private final WriterVersion writerVersion; |
| |
| private long recordCount = 0; |
| private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK; |
| |
| private ColumnWriteStoreImpl store; |
| private ColumnChunkPageWriteStore pageStore; |
| |
| /** |
| * @param w the file to write to |
| * @param writeSupport the class to convert incoming records |
| * @param schema the schema of the records |
| * @param extraMetaData extra meta data to write in the footer of the file |
| * @param blockSize the size of a block in the file (this will be approximate) |
| * @param codec the codec used to compress |
| */ |
| public InternalParquetRecordWriter( |
| ParquetFileWriter w, |
| WriteSupport<T> writeSupport, |
| MessageType schema, |
| Map<String, String> extraMetaData, |
| int blockSize, |
| int pageSize, |
| BytesCompressor compressor, |
| int dictionaryPageSize, |
| boolean enableDictionary, |
| boolean validating, |
| WriterVersion writerVersion) { |
| this.w = w; |
| this.writeSupport = checkNotNull(writeSupport, "writeSupport"); |
| this.schema = schema; |
| this.extraMetaData = extraMetaData; |
| this.blockSize = blockSize; |
| this.pageSize = pageSize; |
| this.compressor = compressor; |
| this.dictionaryPageSize = dictionaryPageSize; |
| this.enableDictionary = enableDictionary; |
| this.validating = validating; |
| this.writerVersion = writerVersion; |
| initStore(); |
| } |
| |
| private void initStore() { |
| // we don't want this number to be too small |
| // ideally we divide the block equally across the columns |
| // it is unlikely all columns are going to be the same size. |
| int initialBlockBufferSize = max(MINIMUM_BUFFER_SIZE, blockSize / schema.getColumns().size() / 5); |
| pageStore = new ColumnChunkPageWriteStore(compressor, schema, initialBlockBufferSize); |
| // we don't want this number to be too small either |
| // ideally, slightly bigger than the page size, but not bigger than the block buffer |
| int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize)); |
| store = new ColumnWriteStoreImpl(pageStore, pageSize, initialPageBufferSize, dictionaryPageSize, enableDictionary, writerVersion); |
| MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema); |
| writeSupport.prepareForWrite(columnIO.getRecordWriter(store)); |
| } |
| |
| public void close() throws IOException, InterruptedException { |
| flushStore(); |
| w.end(extraMetaData); |
| } |
| |
| public void write(T value) throws IOException, InterruptedException { |
| writeSupport.write(value); |
| ++ recordCount; |
| checkBlockSizeReached(); |
| } |
| |
| private void checkBlockSizeReached() throws IOException { |
| if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record. |
| long memSize = store.memSize(); |
| if (memSize > blockSize) { |
| if (DEBUG) LOG.debug(format("mem size %,d > %,d: flushing %,d records to disk.", memSize, blockSize, recordCount)); |
| flushStore(); |
| initStore(); |
| recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK); |
| } else { |
| float recordSize = (float) memSize / recordCount; |
| recordCountForNextMemCheck = min( |
| max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(blockSize / recordSize)) / 2), // will check halfway |
| recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead |
| ); |
| if (DEBUG) LOG.debug(format("Checked mem at %,d will check again at: %,d ", recordCount, recordCountForNextMemCheck)); |
| } |
| } |
| } |
| |
| public long getEstimatedWrittenSize() throws IOException { |
| return w.getPos() + store.memSize(); |
| } |
| |
| private void flushStore() |
| throws IOException { |
| if (DEBUG) LOG.debug(format("Flushing mem store to file. allocated memory: %,d", store.allocatedSize())); |
| if (store.allocatedSize() > 3 * blockSize) { |
| LOG.warn("Too much memory used: " + store.memUsageString()); |
| } |
| w.startBlock(recordCount); |
| store.flush(); |
| pageStore.flushToFileWriter(w); |
| recordCount = 0; |
| w.endBlock(); |
| store = null; |
| pageStore = null; |
| } |
| } |