| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.parquet.hadoop; |
| |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.hadoop.conf.Configuration; |
| |
| import org.apache.parquet.HadoopReadOptions; |
| import org.apache.parquet.ParquetReadOptions; |
| import org.apache.parquet.column.page.PageReadStore; |
| import org.apache.parquet.filter.UnboundRecordFilter; |
| import org.apache.parquet.filter2.compat.FilterCompat; |
| import org.apache.parquet.filter2.compat.FilterCompat.Filter; |
| import org.apache.parquet.hadoop.api.InitContext; |
| import org.apache.parquet.hadoop.api.ReadSupport; |
| import org.apache.parquet.hadoop.metadata.FileMetaData; |
| import org.apache.parquet.hadoop.util.counters.BenchmarkCounter; |
| import org.apache.parquet.io.ColumnIOFactory; |
| import org.apache.parquet.io.MessageColumnIO; |
| import org.apache.parquet.io.ParquetDecodingException; |
| import org.apache.parquet.io.api.RecordMaterializer; |
| import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; |
| import org.apache.parquet.schema.MessageType; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static java.lang.String.format; |
| import static org.apache.parquet.Preconditions.checkNotNull; |
| import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED; |
| import static org.apache.parquet.hadoop.ParquetInputFormat.STRICT_TYPE_CHECKING; |
| |
| class InternalParquetRecordReader<T> { |
| private static final Logger LOG = LoggerFactory.getLogger(InternalParquetRecordReader.class); |
| |
| private ColumnIOFactory columnIOFactory = null; |
| private final Filter filter; |
| private boolean filterRecords = true; |
| |
| private MessageType requestedSchema; |
| private MessageType fileSchema; |
| private int columnCount; |
| private final ReadSupport<T> readSupport; |
| |
| private RecordMaterializer<T> recordConverter; |
| |
| private T currentValue; |
| private long total; |
| private long current = 0; |
| private int currentBlock = -1; |
| private ParquetFileReader reader; |
| private org.apache.parquet.io.RecordReader<T> recordReader; |
| private boolean strictTypeChecking; |
| |
| private long totalTimeSpentReadingBytes; |
| private long totalTimeSpentProcessingRecords; |
| private long startedAssemblingCurrentBlockAt; |
| |
| private long totalCountLoadedSoFar = 0; |
| |
| private UnmaterializableRecordCounter unmaterializableRecordCounter; |
| |
| /** |
| * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro. |
| * @param filter for filtering individual records |
| */ |
| public InternalParquetRecordReader(ReadSupport<T> readSupport, Filter filter) { |
| this.readSupport = readSupport; |
| this.filter = checkNotNull(filter, "filter"); |
| } |
| |
| /** |
| * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro. |
| */ |
| public InternalParquetRecordReader(ReadSupport<T> readSupport) { |
| this(readSupport, FilterCompat.NOOP); |
| } |
| |
| /** |
| * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro. |
| * @param filter Optional filter for only returning matching records. |
| * @deprecated use {@link #InternalParquetRecordReader(ReadSupport, Filter)} |
| */ |
| @Deprecated |
| public InternalParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter filter) { |
| this(readSupport, FilterCompat.get(filter)); |
| } |
| |
| private void checkRead() throws IOException { |
| if (current == totalCountLoadedSoFar) { |
| if (current != 0) { |
| totalTimeSpentProcessingRecords += (System.currentTimeMillis() - startedAssemblingCurrentBlockAt); |
| if (LOG.isInfoEnabled()) { |
| LOG.info("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in " + totalTimeSpentProcessingRecords + " ms: "+((float)totalCountLoadedSoFar / totalTimeSpentProcessingRecords) + " rec/ms, " + ((float)totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms"); |
| final long totalTime = totalTimeSpentProcessingRecords + totalTimeSpentReadingBytes; |
| if (totalTime != 0) { |
| final long percentReading = 100 * totalTimeSpentReadingBytes / totalTime; |
| final long percentProcessing = 100 * totalTimeSpentProcessingRecords / totalTime; |
| LOG.info("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " + percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)"); |
| } |
| } |
| } |
| |
| LOG.info("at row " + current + ". reading next block"); |
| long t0 = System.currentTimeMillis(); |
| PageReadStore pages = reader.readNextFilteredRowGroup(); |
| if (pages == null) { |
| throw new IOException("expecting more rows but reached last block. Read " + current + " out of " + total); |
| } |
| long timeSpentReading = System.currentTimeMillis() - t0; |
| totalTimeSpentReadingBytes += timeSpentReading; |
| BenchmarkCounter.incrementTime(timeSpentReading); |
| if (LOG.isInfoEnabled()) LOG.info("block read in memory in {} ms. row count = {}", timeSpentReading, pages.getRowCount()); |
| LOG.debug("initializing Record assembly with requested schema {}", requestedSchema); |
| MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema, strictTypeChecking); |
| recordReader = columnIO.getRecordReader(pages, recordConverter, |
| filterRecords ? filter : FilterCompat.NOOP); |
| startedAssemblingCurrentBlockAt = System.currentTimeMillis(); |
| totalCountLoadedSoFar += pages.getRowCount(); |
| ++ currentBlock; |
| } |
| } |
| |
| public void close() throws IOException { |
| if (reader != null) { |
| reader.close(); |
| } |
| } |
| |
| public Void getCurrentKey() throws IOException, InterruptedException { |
| return null; |
| } |
| |
| public T getCurrentValue() throws IOException, |
| InterruptedException { |
| return currentValue; |
| } |
| |
| public float getProgress() throws IOException, InterruptedException { |
| return (float) current / total; |
| } |
| |
| public void initialize(ParquetFileReader reader, ParquetReadOptions options) { |
| // copy custom configuration to the Configuration passed to the ReadSupport |
| Configuration conf = new Configuration(); |
| if (options instanceof HadoopReadOptions) { |
| conf = ((HadoopReadOptions) options).getConf(); |
| } |
| for (String property : options.getPropertyNames()) { |
| conf.set(property, options.getProperty(property)); |
| } |
| |
| // initialize a ReadContext for this file |
| this.reader = reader; |
| FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); |
| this.fileSchema = parquetFileMetadata.getSchema(); |
| Map<String, String> fileMetadata = parquetFileMetadata.getKeyValueMetaData(); |
| ReadSupport.ReadContext readContext = readSupport.init(new InitContext(conf, toSetMultiMap(fileMetadata), fileSchema)); |
| this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); |
| this.requestedSchema = readContext.getRequestedSchema(); |
| this.columnCount = requestedSchema.getPaths().size(); |
| this.recordConverter = readSupport.prepareForRead(conf, fileMetadata, fileSchema, readContext); |
| this.strictTypeChecking = options.isEnabled(STRICT_TYPE_CHECKING, true); |
| this.total = reader.getFilteredRecordCount(); |
| this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(options, total); |
| this.filterRecords = options.useRecordFilter(); |
| reader.setRequestedSchema(requestedSchema); |
| LOG.info("RecordReader initialized will read a total of {} records.", total); |
| } |
| |
| public void initialize(ParquetFileReader reader, Configuration configuration) |
| throws IOException { |
| // initialize a ReadContext for this file |
| this.reader = reader; |
| FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); |
| this.fileSchema = parquetFileMetadata.getSchema(); |
| Map<String, String> fileMetadata = parquetFileMetadata.getKeyValueMetaData(); |
| ReadSupport.ReadContext readContext = readSupport.init(new InitContext( |
| configuration, toSetMultiMap(fileMetadata), fileSchema)); |
| this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); |
| this.requestedSchema = readContext.getRequestedSchema(); |
| this.columnCount = requestedSchema.getPaths().size(); |
| this.recordConverter = readSupport.prepareForRead( |
| configuration, fileMetadata, fileSchema, readContext); |
| this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true); |
| this.total = reader.getFilteredRecordCount(); |
| this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(configuration, total); |
| this.filterRecords = configuration.getBoolean(RECORD_FILTERING_ENABLED, true); |
| reader.setRequestedSchema(requestedSchema); |
| LOG.info("RecordReader initialized will read a total of {} records.", total); |
| } |
| |
| public boolean nextKeyValue() throws IOException, InterruptedException { |
| boolean recordFound = false; |
| |
| while (!recordFound) { |
| // no more records left |
| if (current >= total) { return false; } |
| |
| try { |
| checkRead(); |
| current ++; |
| |
| try { |
| currentValue = recordReader.read(); |
| } catch (RecordMaterializationException e) { |
| // this might throw, but it's fatal if it does. |
| unmaterializableRecordCounter.incErrors(e); |
| LOG.debug("skipping a corrupt record"); |
| continue; |
| } |
| |
| if (recordReader.shouldSkipCurrentRecord()) { |
| // this record is being filtered via the filter2 package |
| LOG.debug("skipping record"); |
| continue; |
| } |
| |
| if (currentValue == null) { |
| // only happens with FilteredRecordReader at end of block |
| current = totalCountLoadedSoFar; |
| LOG.debug("filtered record reader reached end of block"); |
| continue; |
| } |
| |
| recordFound = true; |
| |
| LOG.debug("read value: {}", currentValue); |
| } catch (RuntimeException e) { |
| throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, reader.getPath()), e); |
| } |
| } |
| return true; |
| } |
| |
| private static <K, V> Map<K, Set<V>> toSetMultiMap(Map<K, V> map) { |
| Map<K, Set<V>> setMultiMap = new HashMap<K, Set<V>>(); |
| for (Map.Entry<K, V> entry : map.entrySet()) { |
| Set<V> set = new HashSet<V>(); |
| set.add(entry.getValue()); |
| setMultiMap.put(entry.getKey(), Collections.unmodifiableSet(set)); |
| } |
| return Collections.unmodifiableMap(setMultiMap); |
| } |
| |
| } |