blob: b73b87399fdd83443fe9d692aedc4b652dd0e4e7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
import static org.apache.parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Deque;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HiddenFileFilter;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.schema.MessageType;
/**
* Utility to print footer information
*/
public class PrintFooter {
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("usage PrintFooter <path>");
return;
}
Path path = new Path(new URI(args[0]));
final Configuration configuration = new Configuration();
final FileSystem fs = path.getFileSystem(configuration);
FileStatus fileStatus = fs.getFileStatus(path);
Path summary = new Path(fileStatus.getPath(), PARQUET_METADATA_FILE);
if (fileStatus.isDir() && fs.exists(summary)) {
System.out.println("reading summary file");
FileStatus summaryStatus = fs.getFileStatus(summary);
List<Footer> readSummaryFile = ParquetFileReader.readSummaryFile(configuration, summaryStatus);
for (Footer footer : readSummaryFile) {
add(footer.getParquetMetadata());
}
} else {
List<FileStatus> statuses;
if (fileStatus.isDir()) {
System.out.println("listing files in " + fileStatus.getPath());
statuses = Arrays.asList(fs.listStatus(fileStatus.getPath(), HiddenFileFilter.INSTANCE));
} else {
statuses = new ArrayList<FileStatus>();
statuses.add(fileStatus);
}
System.out.println("opening " + statuses.size() + " files");
int i = 0;
ExecutorService threadPool = Executors.newFixedThreadPool(5);
try {
long t0 = System.currentTimeMillis();
Deque<Future<ParquetMetadata>> footers = new LinkedBlockingDeque<Future<ParquetMetadata>>();
for (final FileStatus currentFile : statuses) {
footers.add(threadPool.submit(() -> {
try {
return ParquetFileReader.readFooter(configuration, currentFile, NO_FILTER);
} catch (Exception e) {
throw new ParquetDecodingException("could not read footer", e);
}
}));
}
int previousPercent = 0;
int n = 60;
System.out.print("0% [");
for (int j = 0; j < n; j++) {
System.out.print(" ");
}
System.out.print("] 100%");
for (int j = 0; j < n + 6; j++) {
System.out.print('\b');
}
while (!footers.isEmpty()) {
Future<ParquetMetadata> futureFooter = footers.removeFirst();
if (!futureFooter.isDone()) {
footers.addLast(futureFooter);
continue;
}
ParquetMetadata footer = futureFooter.get();
int currentPercent = (++i * n / statuses.size());
while (currentPercent > previousPercent) {
System.out.print("*");
previousPercent ++;
}
add(footer);
}
System.out.println("");
long t1 = System.currentTimeMillis();
System.out.println("read all footers in " + (t1 - t0) + " ms");
} finally {
threadPool.shutdownNow();
}
}
Set<Entry<ColumnDescriptor, ColStats>> entries = stats.entrySet();
long total = 0;
long totalUnc = 0;
for (Entry<ColumnDescriptor, ColStats> entry : entries) {
ColStats colStats = entry.getValue();
total += colStats.allStats.total;
totalUnc += colStats.uncStats.total;
}
for (Entry<ColumnDescriptor, ColStats> entry : entries) {
ColStats colStats = entry.getValue();
System.out.println(entry.getKey() +" " + percent(colStats.allStats.total, total) + "% of all space " + colStats);
}
System.out.println("number of blocks: " + blockCount);
System.out.println("total data size: " + humanReadable(total) + " (raw " + humanReadable(totalUnc) + ")");
System.out.println("total record: " + humanReadable(recordCount));
System.out.println("average block size: " + humanReadable(total/blockCount) + " (raw " + humanReadable(totalUnc/blockCount) + ")");
System.out.println("average record count: " + humanReadable(recordCount/blockCount));
}
private static void add(ParquetMetadata footer) {
for (BlockMetaData blockMetaData : footer.getBlocks()) {
++ blockCount;
MessageType schema = footer.getFileMetaData().getSchema();
recordCount += blockMetaData.getRowCount();
List<ColumnChunkMetaData> columns = blockMetaData.getColumns();
for (ColumnChunkMetaData columnMetaData : columns) {
ColumnDescriptor desc = schema.getColumnDescription(columnMetaData.getPath().toArray());
add(
desc,
columnMetaData.getValueCount(),
columnMetaData.getTotalSize(),
columnMetaData.getTotalUncompressedSize(),
columnMetaData.getEncodings(),
columnMetaData.getStatistics());
}
}
}
private static void printTotalString(String message, long total, long totalUnc) {
System.out.println("total "+message+": " + humanReadable(total) + " (raw "+humanReadable(totalUnc)+" saved "+percentComp(totalUnc, total)+"%)");
}
private static float percentComp(long raw, long compressed) {
return percent(raw - compressed, raw);
}
private static float percent(long numerator, long denominator) {
return ((float)((numerator)*1000/denominator))/10;
}
private static String humanReadable(long size) {
if (size < 1000) {
return String.valueOf(size);
}
long currentSize = size;
long previousSize = size * 1000;
int count = 0;
String[] unit = {"", "K", "M", "G", "T", "P"};
while (currentSize >= 1000) {
previousSize = currentSize;
currentSize = currentSize / 1000;
++ count;
}
return ((float)previousSize/1000) + unit[count];
}
private static Map<ColumnDescriptor, ColStats> stats = new LinkedHashMap<ColumnDescriptor, ColStats>();
private static int blockCount = 0;
private static long recordCount = 0;
private static class Stats {
long min = Long.MAX_VALUE;
long max = Long.MIN_VALUE;
long total = 0;
public void add(long length) {
min = Math.min(length, min);
max = Math.max(length, max);
total += length;
}
public String toString(int blocks) {
return
"min: " + humanReadable(min) +
" max: " + humanReadable(max) +
" average: " + humanReadable(total/blocks) +
" total: " + humanReadable(total);
}
}
private static class ColStats {
Stats valueCountStats = new Stats();
Stats allStats = new Stats();
Stats uncStats = new Stats();
Set<Encoding> encodings = new TreeSet<Encoding>();
Statistics colValuesStats = null;
int blocks = 0;
public void add(long valueCount, long size, long uncSize, Collection<Encoding> encodings, Statistics colValuesStats) {
++blocks;
valueCountStats.add(valueCount);
allStats.add(size);
uncStats.add(uncSize);
this.encodings.addAll(encodings);
this.colValuesStats = colValuesStats;
}
@Override
public String toString() {
long raw = uncStats.total;
long compressed = allStats.total;
return encodings + " " + allStats.toString(blocks) + " (raw data: " + humanReadable(raw) + (raw == 0 ? "" : " saving " + (raw - compressed)*100/raw + "%") + ")\n"
+ " values: "+valueCountStats.toString(blocks) + "\n"
+ " uncompressed: "+uncStats.toString(blocks) + "\n"
+ " column values statistics: " + colValuesStats.toString();
}
}
private static void add(ColumnDescriptor desc, long valueCount, long size, long uncSize, Collection<Encoding> encodings, Statistics colValuesStats) {
ColStats colStats = stats.get(desc);
if (colStats == null) {
colStats = new ColStats();
stats.put(desc, colStats);
}
colStats.add(valueCount, size, uncSize, encodings, colValuesStats);
}
}