blob: bf030ac606b06ccc4a149e3755bc9cf099175174 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.cli.commands;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.parquet.cli.BaseCommand;
import org.apache.commons.text.TextStringBuilder;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.DataPage;
import org.apache.parquet.column.page.DataPageV1;
import org.apache.parquet.column.page.DataPageV2;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.Page;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import static org.apache.parquet.cli.Util.columnName;
import static org.apache.parquet.cli.Util.descriptor;
import static org.apache.parquet.cli.Util.encodingAsString;
import static org.apache.parquet.cli.Util.humanReadable;
import static org.apache.parquet.cli.Util.minMaxAsString;
import static org.apache.parquet.cli.Util.primitive;
import static org.apache.parquet.cli.Util.shortCodec;
@Parameters(commandDescription="Print page summaries for a Parquet file")
public class ShowPagesCommand extends BaseCommand {
public ShowPagesCommand(Logger console) {
super(console);
}
@Parameter(description = "<parquet path>")
List<String> targets;
@Parameter(
names = {"-c", "--column", "--columns"},
description = "List of columns")
List<String> columns;
@Override
@SuppressWarnings("unchecked")
public int run() throws IOException {
Preconditions.checkArgument(targets != null && targets.size() >= 1,
"A Parquet file is required.");
Preconditions.checkArgument(targets.size() == 1,
"Cannot process multiple Parquet files.");
String source = targets.get(0);
try (ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source))) {
MessageType schema = reader.getFileMetaData().getSchema();
Map<ColumnDescriptor, PrimitiveType> columns = Maps.newLinkedHashMap();
if (this.columns == null || this.columns.isEmpty()) {
for (ColumnDescriptor descriptor : schema.getColumns()) {
columns.put(descriptor, primitive(schema, descriptor.getPath()));
}
} else {
for (String column : this.columns) {
columns.put(descriptor(column, schema), primitive(column, schema));
}
}
CompressionCodecName codec = reader.getRowGroups().get(0).getColumns().get(0).getCodec();
// accumulate formatted lines to print by column
Map<String, List<String>> formatted = Maps.newLinkedHashMap();
PageFormatter formatter = new PageFormatter();
PageReadStore pageStore;
int rowGroupNum = 0;
while ((pageStore = reader.readNextRowGroup()) != null) {
for (ColumnDescriptor descriptor : columns.keySet()) {
List<String> lines = formatted.get(columnName(descriptor));
if (lines == null) {
lines = Lists.newArrayList();
formatted.put(columnName(descriptor), lines);
}
formatter.setContext(rowGroupNum, columns.get(descriptor), codec);
PageReader pages = pageStore.getPageReader(descriptor);
DictionaryPage dict = pages.readDictionaryPage();
if (dict != null) {
lines.add(formatter.format(dict));
}
DataPage page;
while ((page = pages.readPage()) != null) {
lines.add(formatter.format(page));
}
}
rowGroupNum += 1;
}
// TODO: Show total column size and overall size per value in the column summary line
for (String columnName : formatted.keySet()) {
console.info(String.format("\nColumn: %s\n%s", columnName, new TextStringBuilder(80).appendPadding(80, '-')));
console.info(formatter.getHeader());
for (String line : formatted.get(columnName)) {
console.info(line);
}
console.info("");
}
}
return 0;
}
@Override
public List<String> getExamples() {
return Lists.newArrayList(
"# Show pages for column 'col' from a Parquet file",
"-c col sample.parquet"
);
}
private class PageFormatter implements DataPage.Visitor<String> {
private int rowGroupNum;
private int pageNum;
private PrimitiveType type;
private String shortCodec;
String getHeader() {
return String.format(" %-6s %-5s %-4s %-7s %-10s %-10s %-8s %-7s %s",
"page", "type", "enc", "count", "avg size", "size", "rows", "nulls", "min / max");
}
void setContext(int rowGroupNum, PrimitiveType type, CompressionCodecName codec) {
this.rowGroupNum = rowGroupNum;
this.pageNum = 0;
this.type = type;
this.shortCodec = shortCodec(codec);
}
String format(Page page) {
String formatted = "";
if (page instanceof DictionaryPage) {
formatted = printDictionaryPage((DictionaryPage) page);
} else if (page instanceof DataPage) {
formatted = ((DataPage) page).accept(this);
}
pageNum += 1;
return formatted;
}
private String printDictionaryPage(DictionaryPage dict) {
// TODO: the compressed size of a dictionary page is lost in Parquet
dict.getUncompressedSize();
long totalSize = dict.getCompressedSize();
int count = dict.getDictionarySize();
float perValue = ((float) totalSize) / count;
String enc = encodingAsString(dict.getEncoding(), true);
if (pageNum == 0) {
return String.format("%3d-D %-5s %s %-2s %-7d %-10s %-10s",
rowGroupNum, "dict", shortCodec, enc, count, humanReadable(perValue),
humanReadable(totalSize));
} else {
return String.format("%3d-%-3d %-5s %s %-2s %-7d %-10s %-10s",
rowGroupNum, pageNum, "dict", shortCodec, enc, count, humanReadable(perValue),
humanReadable(totalSize));
}
}
@Override
public String visit(DataPageV1 page) {
String enc = encodingAsString(page.getValueEncoding(), false);
long totalSize = page.getCompressedSize();
int count = page.getValueCount();
String numNulls = page.getStatistics().isNumNullsSet() ? Long.toString(page.getStatistics().getNumNulls()) : "";
float perValue = ((float) totalSize) / count;
String minMax = minMaxAsString(page.getStatistics());
return String.format("%3d-%-3d %-5s %s %-2s %-7d %-10s %-10s %-8s %-7s %s",
rowGroupNum, pageNum, "data", shortCodec, enc, count, humanReadable(perValue),
humanReadable(totalSize), "", numNulls, minMax);
}
@Override
public String visit(DataPageV2 page) {
String enc = encodingAsString(page.getDataEncoding(), false);
long totalSize = page.getCompressedSize();
int count = page.getValueCount();
int numRows = page.getRowCount();
int numNulls = page.getNullCount();
float perValue = ((float) totalSize) / count;
String minMax = minMaxAsString(page.getStatistics());
String compression = (page.isCompressed() ? shortCodec : "_");
return String.format("%3d-%-3d %-5s %s %-2s %-7d %-10s %-10s %-8d %-7s %s",
rowGroupNum, pageNum, "data", compression, enc, count, humanReadable(perValue),
humanReadable(totalSize), numRows, numNulls, minMax);
}
}
}