| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.parquet.cli.commands; |
| |
| import com.beust.jcommander.Parameter; |
| import com.beust.jcommander.Parameters; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import org.apache.parquet.cli.BaseCommand; |
| import org.apache.commons.text.TextStringBuilder; |
| import org.apache.parquet.column.ColumnDescriptor; |
| import org.apache.parquet.column.page.DataPage; |
| import org.apache.parquet.column.page.DataPageV1; |
| import org.apache.parquet.column.page.DataPageV2; |
| import org.apache.parquet.column.page.DictionaryPage; |
| import org.apache.parquet.column.page.Page; |
| import org.apache.parquet.column.page.PageReadStore; |
| import org.apache.parquet.column.page.PageReader; |
| import org.apache.parquet.hadoop.ParquetFileReader; |
| import org.apache.parquet.hadoop.metadata.CompressionCodecName; |
| import org.apache.parquet.schema.MessageType; |
| import org.apache.parquet.schema.PrimitiveType; |
| import org.slf4j.Logger; |
| import java.io.IOException; |
| import java.util.List; |
| import java.util.Map; |
| |
| import static org.apache.parquet.cli.Util.columnName; |
| import static org.apache.parquet.cli.Util.descriptor; |
| import static org.apache.parquet.cli.Util.encodingAsString; |
| import static org.apache.parquet.cli.Util.humanReadable; |
| import static org.apache.parquet.cli.Util.minMaxAsString; |
| import static org.apache.parquet.cli.Util.primitive; |
| import static org.apache.parquet.cli.Util.shortCodec; |
| |
| @Parameters(commandDescription="Print page summaries for a Parquet file") |
| public class ShowPagesCommand extends BaseCommand { |
| |
| public ShowPagesCommand(Logger console) { |
| super(console); |
| } |
| |
| @Parameter(description = "<parquet path>") |
| List<String> targets; |
| |
| @Parameter( |
| names = {"-c", "--column", "--columns"}, |
| description = "List of columns") |
| List<String> columns; |
| |
| @Override |
| @SuppressWarnings("unchecked") |
| public int run() throws IOException { |
| Preconditions.checkArgument(targets != null && targets.size() >= 1, |
| "A Parquet file is required."); |
| Preconditions.checkArgument(targets.size() == 1, |
| "Cannot process multiple Parquet files."); |
| |
| String source = targets.get(0); |
| try (ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source))) { |
| MessageType schema = reader.getFileMetaData().getSchema(); |
| Map<ColumnDescriptor, PrimitiveType> columns = Maps.newLinkedHashMap(); |
| if (this.columns == null || this.columns.isEmpty()) { |
| for (ColumnDescriptor descriptor : schema.getColumns()) { |
| columns.put(descriptor, primitive(schema, descriptor.getPath())); |
| } |
| } else { |
| for (String column : this.columns) { |
| columns.put(descriptor(column, schema), primitive(column, schema)); |
| } |
| } |
| |
| CompressionCodecName codec = reader.getRowGroups().get(0).getColumns().get(0).getCodec(); |
| // accumulate formatted lines to print by column |
| Map<String, List<String>> formatted = Maps.newLinkedHashMap(); |
| PageFormatter formatter = new PageFormatter(); |
| PageReadStore pageStore; |
| int rowGroupNum = 0; |
| while ((pageStore = reader.readNextRowGroup()) != null) { |
| for (ColumnDescriptor descriptor : columns.keySet()) { |
| List<String> lines = formatted.get(columnName(descriptor)); |
| if (lines == null) { |
| lines = Lists.newArrayList(); |
| formatted.put(columnName(descriptor), lines); |
| } |
| |
| formatter.setContext(rowGroupNum, columns.get(descriptor), codec); |
| PageReader pages = pageStore.getPageReader(descriptor); |
| |
| DictionaryPage dict = pages.readDictionaryPage(); |
| if (dict != null) { |
| lines.add(formatter.format(dict)); |
| } |
| DataPage page; |
| while ((page = pages.readPage()) != null) { |
| lines.add(formatter.format(page)); |
| } |
| } |
| rowGroupNum += 1; |
| } |
| |
| // TODO: Show total column size and overall size per value in the column summary line |
| for (String columnName : formatted.keySet()) { |
| console.info(String.format("\nColumn: %s\n%s", columnName, new TextStringBuilder(80).appendPadding(80, '-'))); |
| console.info(formatter.getHeader()); |
| for (String line : formatted.get(columnName)) { |
| console.info(line); |
| } |
| console.info(""); |
| } |
| } |
| |
| return 0; |
| } |
| |
| @Override |
| public List<String> getExamples() { |
| return Lists.newArrayList( |
| "# Show pages for column 'col' from a Parquet file", |
| "-c col sample.parquet" |
| ); |
| } |
| |
| private class PageFormatter implements DataPage.Visitor<String> { |
| private int rowGroupNum; |
| private int pageNum; |
| private PrimitiveType type; |
| private String shortCodec; |
| |
| String getHeader() { |
| return String.format(" %-6s %-5s %-4s %-7s %-10s %-10s %-8s %-7s %s", |
| "page", "type", "enc", "count", "avg size", "size", "rows", "nulls", "min / max"); |
| } |
| |
| void setContext(int rowGroupNum, PrimitiveType type, CompressionCodecName codec) { |
| this.rowGroupNum = rowGroupNum; |
| this.pageNum = 0; |
| this.type = type; |
| this.shortCodec = shortCodec(codec); |
| } |
| |
| String format(Page page) { |
| String formatted = ""; |
| if (page instanceof DictionaryPage) { |
| formatted = printDictionaryPage((DictionaryPage) page); |
| } else if (page instanceof DataPage) { |
| formatted = ((DataPage) page).accept(this); |
| } |
| pageNum += 1; |
| return formatted; |
| } |
| |
| private String printDictionaryPage(DictionaryPage dict) { |
| // TODO: the compressed size of a dictionary page is lost in Parquet |
| dict.getUncompressedSize(); |
| long totalSize = dict.getCompressedSize(); |
| int count = dict.getDictionarySize(); |
| float perValue = ((float) totalSize) / count; |
| String enc = encodingAsString(dict.getEncoding(), true); |
| if (pageNum == 0) { |
| return String.format("%3d-D %-5s %s %-2s %-7d %-10s %-10s", |
| rowGroupNum, "dict", shortCodec, enc, count, humanReadable(perValue), |
| humanReadable(totalSize)); |
| } else { |
| return String.format("%3d-%-3d %-5s %s %-2s %-7d %-10s %-10s", |
| rowGroupNum, pageNum, "dict", shortCodec, enc, count, humanReadable(perValue), |
| humanReadable(totalSize)); |
| } |
| } |
| |
| @Override |
| public String visit(DataPageV1 page) { |
| String enc = encodingAsString(page.getValueEncoding(), false); |
| long totalSize = page.getCompressedSize(); |
| int count = page.getValueCount(); |
| String numNulls = page.getStatistics().isNumNullsSet() ? Long.toString(page.getStatistics().getNumNulls()) : ""; |
| float perValue = ((float) totalSize) / count; |
| String minMax = minMaxAsString(page.getStatistics()); |
| return String.format("%3d-%-3d %-5s %s %-2s %-7d %-10s %-10s %-8s %-7s %s", |
| rowGroupNum, pageNum, "data", shortCodec, enc, count, humanReadable(perValue), |
| humanReadable(totalSize), "", numNulls, minMax); |
| } |
| |
| @Override |
| public String visit(DataPageV2 page) { |
| String enc = encodingAsString(page.getDataEncoding(), false); |
| long totalSize = page.getCompressedSize(); |
| int count = page.getValueCount(); |
| int numRows = page.getRowCount(); |
| int numNulls = page.getNullCount(); |
| float perValue = ((float) totalSize) / count; |
| String minMax = minMaxAsString(page.getStatistics()); |
| String compression = (page.isCompressed() ? shortCodec : "_"); |
| return String.format("%3d-%-3d %-5s %s %-2s %-7d %-10s %-10s %-8d %-7s %s", |
| rowGroupNum, pageNum, "data", compression, enc, count, humanReadable(perValue), |
| humanReadable(totalSize), numRows, numNulls, minMax); |
| } |
| } |
| } |