blob: f70219b7d7ae2734107674b03ce8104ed44b00db [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.orc.impl;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.Key;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import org.apache.orc.EncryptionAlgorithm;
import org.apache.orc.EncryptionKey;
import org.apache.orc.CompressionKind;
import org.apache.orc.DataMaskDescription;
import org.apache.orc.EncryptionVariant;
import org.apache.orc.FileMetadata;
import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
import org.apache.orc.OrcUtils;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.ColumnStatistics;
import org.apache.orc.CompressionCodec;
import org.apache.orc.FileFormatException;
import org.apache.orc.StripeInformation;
import org.apache.orc.StripeStatistics;
import org.apache.orc.UnknownFormatException;
import org.apache.orc.impl.reader.ReaderEncryption;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.hadoop.io.Text;
import org.apache.orc.OrcProto;
import com.google.protobuf.CodedInputStream;
import org.apache.orc.impl.reader.ReaderEncryptionVariant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ReaderImpl implements Reader {
private static final Logger LOG = LoggerFactory.getLogger(ReaderImpl.class);
private static final int DIRECTORY_SIZE_GUESS = 16 * 1024;
private final long maxLength;
protected final Path path;
protected final OrcFile.ReaderOptions options;
private final org.apache.orc.CompressionKind compressionKind;
protected FSDataInputStream file;
protected int bufferSize;
// the unencrypted stripe statistics or null if they haven't been read yet
protected List<OrcProto.StripeStatistics> stripeStatistics;
private final int metadataSize;
private TypeDescription schema;
private final List<OrcProto.UserMetadataItem> userMetadata;
private final List<OrcProto.ColumnStatistics> fileStats;
private final List<StripeInformation> stripes;
protected final int rowIndexStride;
private final long contentLength, numberOfRows;
private final ReaderEncryption encryption;
private long deserializedSize = -1;
protected final Configuration conf;
protected final boolean useUTCTimestamp;
private final List<Integer> versionList;
private final OrcFile.WriterVersion writerVersion;
protected final OrcTail tail;
public static class StripeInformationImpl
implements StripeInformation {
private final long stripeId;
private final long originalStripeId;
private final byte[][] encryptedKeys;
private final OrcProto.StripeInformation stripe;
public StripeInformationImpl(OrcProto.StripeInformation stripe,
long stripeId,
long previousOriginalStripeId,
byte[][] previousKeys) {
this.stripe = stripe;
this.stripeId = stripeId;
if (stripe.hasEncryptStripeId()) {
originalStripeId = stripe.getEncryptStripeId();
} else {
originalStripeId = previousOriginalStripeId + 1;
}
if (stripe.getEncryptedLocalKeysCount() != 0) {
encryptedKeys = new byte[stripe.getEncryptedLocalKeysCount()][];
for(int v=0; v < encryptedKeys.length; ++v) {
encryptedKeys[v] = stripe.getEncryptedLocalKeys(v).toByteArray();
}
} else {
encryptedKeys = previousKeys;
}
}
@Override
public long getOffset() {
return stripe.getOffset();
}
@Override
public long getLength() {
return stripe.getDataLength() + getIndexLength() + getFooterLength();
}
@Override
public long getDataLength() {
return stripe.getDataLength();
}
@Override
public long getFooterLength() {
return stripe.getFooterLength();
}
@Override
public long getIndexLength() {
return stripe.getIndexLength();
}
@Override
public long getNumberOfRows() {
return stripe.getNumberOfRows();
}
@Override
public long getStripeId() {
return stripeId;
}
@Override
public boolean hasEncryptionStripeId() {
return stripe.hasEncryptStripeId();
}
@Override
public long getEncryptionStripeId() {
return originalStripeId;
}
@Override
public byte[][] getEncryptedLocalKeys() {
return encryptedKeys;
}
@Override
public String toString() {
return "offset: " + getOffset() + " data: " +
getDataLength() + " rows: " + getNumberOfRows() + " tail: " +
getFooterLength() + " index: " + getIndexLength() +
(!hasEncryptionStripeId() || stripeId == originalStripeId - 1
? "" : " encryption id: " + originalStripeId);
}
}
@Override
public long getNumberOfRows() {
return numberOfRows;
}
@Override
public List<String> getMetadataKeys() {
List<String> result = new ArrayList<>();
for(OrcProto.UserMetadataItem item: userMetadata) {
result.add(item.getName());
}
return result;
}
@Override
public ByteBuffer getMetadataValue(String key) {
for(OrcProto.UserMetadataItem item: userMetadata) {
if (item.hasName() && item.getName().equals(key)) {
return item.getValue().asReadOnlyByteBuffer();
}
}
throw new IllegalArgumentException("Can't find user metadata " + key);
}
@Override
public boolean hasMetadataValue(String key) {
for(OrcProto.UserMetadataItem item: userMetadata) {
if (item.hasName() && item.getName().equals(key)) {
return true;
}
}
return false;
}
@Override
public org.apache.orc.CompressionKind getCompressionKind() {
return compressionKind;
}
@Override
public int getCompressionSize() {
return bufferSize;
}
@Override
public List<StripeInformation> getStripes() {
return stripes;
}
@Override
public long getContentLength() {
return contentLength;
}
@Override
public List<OrcProto.Type> getTypes() {
return OrcUtils.getOrcTypes(schema);
}
public static OrcFile.Version getFileVersion(List<Integer> versionList) {
if (versionList == null || versionList.isEmpty()) {
return OrcFile.Version.V_0_11;
}
for (OrcFile.Version version: OrcFile.Version.values()) {
if (version.getMajor() == versionList.get(0) &&
version.getMinor() == versionList.get(1)) {
return version;
}
}
return OrcFile.Version.FUTURE;
}
@Override
public OrcFile.Version getFileVersion() {
return getFileVersion(versionList);
}
@Override
public OrcFile.WriterVersion getWriterVersion() {
return writerVersion;
}
@Override
public OrcProto.FileTail getFileTail() {
return tail.getFileTail();
}
@Override
public EncryptionKey[] getColumnEncryptionKeys() {
return encryption.getKeys();
}
@Override
public DataMaskDescription[] getDataMasks() {
return encryption.getMasks();
}
@Override
public ReaderEncryptionVariant[] getEncryptionVariants() {
return encryption.getVariants();
}
@Override
public List<StripeStatistics> getVariantStripeStatistics(EncryptionVariant variant) throws IOException {
if (variant == null) {
if (stripeStatistics == null) {
try (CompressionCodec codec = OrcCodecPool.getCodec(compressionKind)) {
InStream.StreamOptions options = new InStream.StreamOptions();
if (codec != null) {
options.withCodec(codec).withBufferSize(bufferSize);
}
// deserialize the unencrypted stripe statistics
stripeStatistics = deserializeStripeStats(tail.getTailBuffer(),
tail.getMetadataOffset(), tail.getMetadataSize(), options);
}
}
return convertFromProto(stripeStatistics);
} else {
try (CompressionCodec codec = OrcCodecPool.getCodec(compressionKind)) {
InStream.StreamOptions compression = new InStream.StreamOptions();
if (codec != null) {
compression.withCodec(codec).withBufferSize(bufferSize);
}
return ((ReaderEncryptionVariant) variant).getStripeStatistics(null,
compression, this);
}
}
}
/**
* Internal access to our view of the encryption.
* @return the encryption information for this reader.
*/
public ReaderEncryption getEncryption() {
return encryption;
}
@Override
public int getRowIndexStride() {
return rowIndexStride;
}
@Override
public ColumnStatistics[] getStatistics() {
ColumnStatistics[] result = deserializeStats(schema, fileStats);
if (encryption.getKeys().length > 0) {
try (CompressionCodec codec = OrcCodecPool.getCodec(compressionKind)) {
InStream.StreamOptions compression = InStream.options();
if (codec != null) {
compression.withCodec(codec).withBufferSize(bufferSize);
}
for (int c = schema.getId(); c <= schema.getMaximumId(); ++c) {
ReaderEncryptionVariant variant = encryption.getVariant(c);
if (variant != null) {
try {
int base = variant.getRoot().getId();
ColumnStatistics[] overrides = decryptFileStats(variant,
compression, tail.getFooter());
for(int sub=0; sub < overrides.length; ++sub) {
result[base + sub] = overrides[sub];
}
} catch (IOException e) {
throw new RuntimeException("Can't decrypt file stats for " + path +
" with " + variant.getKeyDescription());
}
}
}
}
}
return result;
}
private ColumnStatistics[] decryptFileStats(ReaderEncryptionVariant encryption,
InStream.StreamOptions compression,
OrcProto.Footer footer
) throws IOException {
Key key = encryption.getFileFooterKey();
if (key == null) {
return null;
} else {
OrcProto.EncryptionVariant protoVariant =
footer.getEncryption().getVariants(encryption.getVariantId());
byte[] bytes = protoVariant.getFileStatistics().toByteArray();
BufferChunk buffer = new BufferChunk(ByteBuffer.wrap(bytes), 0);
EncryptionAlgorithm algorithm = encryption.getKeyDescription().getAlgorithm();
byte[] iv = new byte[algorithm.getIvLength()];
CryptoUtils.modifyIvForStream(encryption.getRoot().getId(),
OrcProto.Stream.Kind.FILE_STATISTICS, footer.getStripesCount() + 1)
.accept(iv);
InStream.StreamOptions options = new InStream.StreamOptions(compression)
.withEncryption(algorithm, key, iv);
InStream in = InStream.create("encrypted file stats", buffer,
0, bytes.length, options);
OrcProto.FileStatistics decrypted = OrcProto.FileStatistics.parseFrom(in);
ColumnStatistics[] result = new ColumnStatistics[decrypted.getColumnCount()];
TypeDescription root = encryption.getRoot();
for(int i= 0; i < result.length; ++i){
result[i] = ColumnStatisticsImpl.deserialize(root.findSubtype(root.getId() + i),
decrypted.getColumn(i), writerUsedProlepticGregorian(),
getConvertToProlepticGregorian());
}
return result;
}
}
public ColumnStatistics[] deserializeStats(
TypeDescription schema,
List<OrcProto.ColumnStatistics> fileStats) {
ColumnStatistics[] result = new ColumnStatistics[fileStats.size()];
for(int i=0; i < result.length; ++i) {
TypeDescription subschema = schema == null ? null : schema.findSubtype(i);
result[i] = ColumnStatisticsImpl.deserialize(subschema, fileStats.get(i),
writerUsedProlepticGregorian(),
getConvertToProlepticGregorian());
}
return result;
}
@Override
public TypeDescription getSchema() {
return schema;
}
/**
* Ensure this is an ORC file to prevent users from trying to read text
* files or RC files as ORC files.
* @param in the file being read
* @param path the filename for error messages
* @param psLen the postscript length
* @param buffer the tail of the file
*/
protected static void ensureOrcFooter(FSDataInputStream in,
Path path,
int psLen,
ByteBuffer buffer) throws IOException {
int magicLength = OrcFile.MAGIC.length();
int fullLength = magicLength + 1;
if (psLen < fullLength || buffer.remaining() < fullLength) {
throw new FileFormatException("Malformed ORC file " + path +
". Invalid postscript length " + psLen);
}
int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - fullLength;
byte[] array = buffer.array();
// now look for the magic string at the end of the postscript.
if (!Text.decode(array, offset, magicLength).equals(OrcFile.MAGIC)) {
// If it isn't there, this may be the 0.11.0 version of ORC.
// Read the first 3 bytes of the file to check for the header
byte[] header = new byte[magicLength];
in.readFully(0, header, 0, magicLength);
// if it isn't there, this isn't an ORC file
if (!Text.decode(header, 0 , magicLength).equals(OrcFile.MAGIC)) {
throw new FileFormatException("Malformed ORC file " + path +
". Invalid postscript.");
}
}
}
/**
* Build a version string out of an array.
* @param version the version number as a list
* @return the human readable form of the version string
*/
private static String versionString(List<Integer> version) {
StringBuilder buffer = new StringBuilder();
for(int i=0; i < version.size(); ++i) {
if (i != 0) {
buffer.append('.');
}
buffer.append(version.get(i));
}
return buffer.toString();
}
/**
* Check to see if this ORC file is from a future version and if so,
* warn the user that we may not be able to read all of the column encodings.
* @param path the data source path for error messages
* @param postscript the parsed postscript
*/
protected static void checkOrcVersion(Path path,
OrcProto.PostScript postscript
) throws IOException {
List<Integer> version = postscript.getVersionList();
if (getFileVersion(version) == OrcFile.Version.FUTURE) {
throw new UnknownFormatException(path, versionString(version),
postscript);
}
}
/**
* Constructor that let's the user specify additional options.
* @param path pathname for file
* @param options options for reading
*/
public ReaderImpl(Path path, OrcFile.ReaderOptions options) throws IOException {
this.path = path;
this.options = options;
this.conf = options.getConfiguration();
this.maxLength = options.getMaxLength();
this.useUTCTimestamp = options.getUseUTCTimestamp();
FileMetadata fileMetadata = options.getFileMetadata();
if (fileMetadata != null) {
this.compressionKind = fileMetadata.getCompressionKind();
this.bufferSize = fileMetadata.getCompressionBufferSize();
this.metadataSize = fileMetadata.getMetadataSize();
this.stripeStatistics = fileMetadata.getStripeStats();
this.versionList = fileMetadata.getVersionList();
OrcFile.WriterImplementation writer =
OrcFile.WriterImplementation.from(fileMetadata.getWriterImplementation());
this.writerVersion =
OrcFile.WriterVersion.from(writer, fileMetadata.getWriterVersionNum());
List<OrcProto.Type> types = fileMetadata.getTypes();
OrcUtils.isValidTypeTree(types, 0);
this.schema = OrcUtils.convertTypeFromProtobuf(types, 0);
this.rowIndexStride = fileMetadata.getRowIndexStride();
this.contentLength = fileMetadata.getContentLength();
this.numberOfRows = fileMetadata.getNumberOfRows();
this.fileStats = fileMetadata.getFileStats();
this.stripes = fileMetadata.getStripes();
this.tail = null;
this.userMetadata = null; // not cached and not needed here
// FileMetadata is obsolete and doesn't support encryption
this.encryption = new ReaderEncryption();
} else {
OrcTail orcTail = options.getOrcTail();
if (orcTail == null) {
tail = extractFileTail(getFileSystem(), path, options.getMaxLength());
options.orcTail(tail);
} else {
checkOrcVersion(path, orcTail.getPostScript());
tail = orcTail;
}
this.compressionKind = tail.getCompressionKind();
this.bufferSize = tail.getCompressionBufferSize();
this.metadataSize = tail.getMetadataSize();
this.versionList = tail.getPostScript().getVersionList();
this.schema = tail.getSchema();
this.rowIndexStride = tail.getFooter().getRowIndexStride();
this.contentLength = tail.getFooter().getContentLength();
this.numberOfRows = tail.getFooter().getNumberOfRows();
this.userMetadata = tail.getFooter().getMetadataList();
this.fileStats = tail.getFooter().getStatisticsList();
this.writerVersion = tail.getWriterVersion();
this.stripes = tail.getStripes();
this.stripeStatistics = null;
this.encryption = new ReaderEncryption(tail.getFooter(), schema,
tail.getStripeStatisticsOffset(), tail.getTailBuffer(), stripes, options.getKeyProvider(), conf);
}
}
protected FileSystem getFileSystem() throws IOException {
FileSystem fileSystem = options.getFilesystem();
if (fileSystem == null) {
fileSystem = path.getFileSystem(options.getConfiguration());
options.filesystem(fileSystem);
}
return fileSystem;
}
protected Supplier<FileSystem> getFileSystemSupplier() {
return () -> {
try {
return getFileSystem();
} catch (IOException e) {
throw new RuntimeException("Can't create filesystem", e);
}
};
}
/**
* Get the WriterVersion based on the ORC file postscript.
* @param writerVersion the integer writer version
* @return the version of the software that produced the file
*/
public static OrcFile.WriterVersion getWriterVersion(int writerVersion) {
for(OrcFile.WriterVersion version: OrcFile.WriterVersion.values()) {
if (version.getId() == writerVersion) {
return version;
}
}
return OrcFile.WriterVersion.FUTURE;
}
public static OrcProto.Metadata extractMetadata(ByteBuffer bb, int metadataAbsPos,
int metadataSize, InStream.StreamOptions options) throws IOException {
bb.position(metadataAbsPos);
bb.limit(metadataAbsPos + metadataSize);
return OrcProto.Metadata.parseFrom(InStream.createCodedInputStream(
InStream.create("metadata", new BufferChunk(bb, 0), 0, metadataSize, options)));
}
private static OrcProto.PostScript extractPostScript(BufferChunk buffer,
Path path,
int psLen,
long psOffset
) throws IOException {
CodedInputStream in = InStream.createCodedInputStream(
InStream.create("ps", buffer, psOffset, psLen));
OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(in);
checkOrcVersion(path, ps);
// Check compression codec.
switch (ps.getCompression()) {
case NONE:
case ZLIB:
case SNAPPY:
case LZO:
case LZ4:
case ZSTD:
break;
default:
throw new IllegalArgumentException("Unknown compression");
}
return ps;
}
/**
* Build a virtual OrcTail for empty files.
* @return a new OrcTail
*/
OrcTail buildEmptyTail() throws IOException {
OrcProto.PostScript.Builder postscript = OrcProto.PostScript.newBuilder();
OrcFile.Version version = OrcFile.Version.CURRENT;
postscript.setMagic(OrcFile.MAGIC)
.setCompression(OrcProto.CompressionKind.NONE)
.setFooterLength(0)
.addVersion(version.getMajor())
.addVersion(version.getMinor())
.setMetadataLength(0)
.setWriterVersion(OrcFile.CURRENT_WRITER.getId());
// Use a struct with no fields
OrcProto.Type.Builder struct = OrcProto.Type.newBuilder();
struct.setKind(OrcProto.Type.Kind.STRUCT);
OrcProto.Footer.Builder footer = OrcProto.Footer.newBuilder();
footer.setHeaderLength(0)
.setContentLength(0)
.addTypes(struct)
.setNumberOfRows(0)
.setRowIndexStride(0);
OrcProto.FileTail.Builder result = OrcProto.FileTail.newBuilder();
result.setFooter(footer);
result.setPostscript(postscript);
result.setFileLength(0);
result.setPostscriptLength(0);
return new OrcTail(result.build(), new BufferChunk(0, 0), -1);
}
private static void read(FSDataInputStream file,
BufferChunk chunks) throws IOException {
while (chunks != null) {
if (!chunks.hasData()) {
int len = chunks.getLength();
ByteBuffer bb = ByteBuffer.allocate(len);
file.readFully(chunks.getOffset(), bb.array(), bb.arrayOffset(), len);
chunks.setChunk(bb);
}
chunks = (BufferChunk) chunks.next;
}
}
protected OrcTail extractFileTail(FileSystem fs, Path path,
long maxFileLength) throws IOException {
BufferChunk buffer;
OrcProto.PostScript ps;
OrcProto.FileTail.Builder fileTailBuilder = OrcProto.FileTail.newBuilder();
long modificationTime;
file = fs.open(path);
try {
// figure out the size of the file using the option or filesystem
long size;
if (maxFileLength == Long.MAX_VALUE) {
FileStatus fileStatus = fs.getFileStatus(path);
size = fileStatus.getLen();
modificationTime = fileStatus.getModificationTime();
} else {
size = maxFileLength;
modificationTime = -1;
}
if (size == 0) {
// Hive often creates empty files (including ORC) and has an
// optimization to create a 0 byte file as an empty ORC file.
return buildEmptyTail();
} else if (size <= OrcFile.MAGIC.length()) {
// Anything smaller than MAGIC header cannot be valid (valid ORC files
// are actually around 40 bytes, this is more conservative)
throw new FileFormatException("Not a valid ORC file " + path
+ " (maxFileLength= " + maxFileLength + ")");
}
fileTailBuilder.setFileLength(size);
//read last bytes into buffer to get PostScript
int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS);
buffer = new BufferChunk(size - readSize, readSize);
read(file, buffer);
//read the PostScript
//get length of PostScript
ByteBuffer bb = buffer.getData();
int psLen = bb.get(readSize - 1) & 0xff;
ensureOrcFooter(file, path, psLen, bb);
long psOffset = size - 1 - psLen;
ps = extractPostScript(buffer, path, psLen, psOffset);
CompressionKind compressionKind =
CompressionKind.valueOf(ps.getCompression().name());
fileTailBuilder.setPostscriptLength(psLen).setPostscript(ps);
int footerSize = (int) ps.getFooterLength();
int metadataSize = (int) ps.getMetadataLength();
int stripeStatSize = (int) ps.getStripeStatisticsLength();
//check if extra bytes need to be read
int tailSize = 1 + psLen + footerSize + metadataSize + stripeStatSize;
int extra = Math.max(0, tailSize - readSize);
if (extra > 0) {
//more bytes need to be read, seek back to the right place and read extra bytes
BufferChunk orig = buffer;
buffer = new BufferChunk(size - tailSize, extra);
buffer.next = orig;
orig.prev = buffer;
read(file, buffer);
}
InStream.StreamOptions compression = new InStream.StreamOptions();
try (CompressionCodec codec = OrcCodecPool.getCodec(compressionKind)) {
if (codec != null) {
compression.withCodec(codec)
.withBufferSize((int) ps.getCompressionBlockSize());
}
OrcProto.Footer footer =
OrcProto.Footer.parseFrom(
InStream.createCodedInputStream(
InStream.create("footer", buffer, psOffset - footerSize,
footerSize, compression)));
fileTailBuilder.setFooter(footer);
}
} catch (Throwable thr) {
try {
close();
} catch (IOException except) {
LOG.info("Ignoring secondary exception in close of " + path, except);
}
throw thr instanceof IOException ? (IOException) thr :
new IOException("Problem reading file footer " + path, thr);
}
return new OrcTail(fileTailBuilder.build(), buffer, modificationTime);
}
@Override
public ByteBuffer getSerializedFileFooter() {
return tail.getSerializedTail();
}
@Override
public boolean writerUsedProlepticGregorian() {
OrcProto.Footer footer = tail.getFooter();
return footer.hasCalendar()
? footer.getCalendar() == OrcProto.CalendarKind.PROLEPTIC_GREGORIAN
: OrcConf.PROLEPTIC_GREGORIAN_DEFAULT.getBoolean(conf);
}
@Override
public boolean getConvertToProlepticGregorian() {
return options.getConvertToProlepticGregorian();
}
@Override
public Options options() {
return new Options(conf);
}
@Override
public RecordReader rows() throws IOException {
return rows(options());
}
@Override
public RecordReader rows(Options options) throws IOException {
LOG.info("Reading ORC rows from " + path + " with " + options);
return new RecordReaderImpl(this, options);
}
@Override
public long getRawDataSize() {
// if the deserializedSize is not computed, then compute it, else
// return the already computed size. since we are reading from the footer
// we don't have to compute deserialized size repeatedly
if (deserializedSize == -1) {
List<Integer> indices = new ArrayList<>();
for (int i = 0; i < fileStats.size(); ++i) {
indices.add(i);
}
deserializedSize = getRawDataSizeFromColIndices(indices);
}
return deserializedSize;
}
@Override
public long getRawDataSizeFromColIndices(List<Integer> colIndices) {
boolean[] include = new boolean[schema.getMaximumId() + 1];
for(Integer rootId: colIndices) {
TypeDescription root = schema.findSubtype(rootId);
for(int c = root.getId(); c <= root.getMaximumId(); ++c) {
include[c] = true;
}
}
return getRawDataSizeFromColIndices(include, schema, fileStats);
}
static long getRawDataSizeFromColIndices(boolean[] include,
TypeDescription schema,
List<OrcProto.ColumnStatistics> stats) {
long result = 0;
for (int c = schema.getId(); c <= schema.getMaximumId(); ++c) {
if (include[c]) {
result += getRawDataSizeOfColumn(schema.findSubtype(c), stats);
}
}
return result;
}
private static long getRawDataSizeOfColumn(TypeDescription column,
List<OrcProto.ColumnStatistics> stats) {
OrcProto.ColumnStatistics colStat = stats.get(column.getId());
long numVals = colStat.getNumberOfValues();
switch (column.getCategory()) {
case BINARY:
// old orc format doesn't support binary statistics. checking for binary
// statistics is not required as protocol buffers takes care of it.
return colStat.getBinaryStatistics().getSum();
case STRING:
case CHAR:
case VARCHAR:
// old orc format doesn't support sum for string statistics. checking for
// existence is not required as protocol buffers takes care of it.
// ORC strings are deserialized to java strings. so use java data model's
// string size
numVals = numVals == 0 ? 1 : numVals;
int avgStrLen = (int) (colStat.getStringStatistics().getSum() / numVals);
return numVals * JavaDataModel.get().lengthForStringOfLength(avgStrLen);
case TIMESTAMP:
case TIMESTAMP_INSTANT:
return numVals * JavaDataModel.get().lengthOfTimestamp();
case DATE:
return numVals * JavaDataModel.get().lengthOfDate();
case DECIMAL:
return numVals * JavaDataModel.get().lengthOfDecimal();
case DOUBLE:
case LONG:
return numVals * JavaDataModel.get().primitive2();
case FLOAT:
case INT:
case SHORT:
case BOOLEAN:
case BYTE:
case STRUCT:
case UNION:
case MAP:
case LIST:
return numVals * JavaDataModel.get().primitive1();
default:
LOG.debug("Unknown primitive category: " + column.getCategory());
break;
}
return 0;
}
@Override
public long getRawDataSizeOfColumns(List<String> colNames) {
boolean[] include = new boolean[schema.getMaximumId() + 1];
for(String name: colNames) {
TypeDescription sub = schema.findSubtype(name);
for(int c = sub.getId(); c <= sub.getMaximumId(); ++c) {
include[c] = true;
}
}
return getRawDataSizeFromColIndices(include, schema, fileStats);
}
@Override
public List<OrcProto.StripeStatistics> getOrcProtoStripeStatistics() {
if (stripeStatistics == null) {
try (CompressionCodec codec = OrcCodecPool.getCodec(compressionKind)) {
InStream.StreamOptions options = new InStream.StreamOptions();
if (codec != null) {
options.withCodec(codec).withBufferSize(bufferSize);
}
stripeStatistics = deserializeStripeStats(tail.getTailBuffer(),
tail.getMetadataOffset(), tail.getMetadataSize(), options);
} catch (IOException ioe) {
throw new RuntimeException("Can't deserialize stripe stats", ioe);
}
}
return stripeStatistics;
}
@Override
public List<OrcProto.ColumnStatistics> getOrcProtoFileStatistics() {
return fileStats;
}
private static
List<OrcProto.StripeStatistics> deserializeStripeStats(BufferChunk tailBuffer,
long offset,
int length,
InStream.StreamOptions options
) throws IOException {
InStream stream = InStream.create("stripe stats", tailBuffer, offset,
length, options);
OrcProto.Metadata meta = OrcProto.Metadata.parseFrom(
InStream.createCodedInputStream(stream));
return meta.getStripeStatsList();
}
private List<StripeStatistics> convertFromProto(List<OrcProto.StripeStatistics> list) {
if (list == null) {
return null;
} else {
List<StripeStatistics> result = new ArrayList<>(list.size());
for (OrcProto.StripeStatistics ss : stripeStatistics) {
result.add(new StripeStatisticsImpl(schema,
new ArrayList<>(ss.getColStatsList()), writerUsedProlepticGregorian(),
getConvertToProlepticGregorian()));
}
return result;
}
}
@Override
public List<StripeStatistics> getStripeStatistics() throws IOException {
return getStripeStatistics(null);
}
@Override
public List<StripeStatistics> getStripeStatistics(boolean[] included) throws IOException {
List<StripeStatistics> result = convertFromProto(stripeStatistics);
if (result == null || encryption.getVariants().length > 0) {
try (CompressionCodec codec = OrcCodecPool.getCodec(compressionKind)) {
InStream.StreamOptions options = new InStream.StreamOptions();
if (codec != null) {
options.withCodec(codec).withBufferSize(bufferSize);
}
result = getVariantStripeStatistics(null);
if (encryption.getVariants().length > 0) {
// process any encrypted overrides that we have the key for
for (int c = schema.getId(); c <= schema.getMaximumId(); ++c) {
// only decrypt the variants that we need
if (included == null || included[c]) {
ReaderEncryptionVariant variant = encryption.getVariant(c);
if (variant != null) {
TypeDescription variantType = variant.getRoot();
List<StripeStatistics> colStats =
variant.getStripeStatistics(included, options, this);
for(int sub = c; sub <= variantType.getMaximumId(); ++sub) {
if (included == null || included[sub]) {
for(int s = 0; s < colStats.size(); ++s) {
StripeStatisticsImpl resultElem = (StripeStatisticsImpl) result.get(s);
resultElem.updateColumn(sub, colStats.get(s).getColumn(sub - variantType.getId()));
}
}
}
c = variantType.getMaximumId();
}
}
}
}
}
}
return result;
}
@Override
public List<Integer> getVersionList() {
return versionList;
}
@Override
public int getMetadataSize() {
return metadataSize;
}
@Override
public String toString() {
StringBuilder buffer = new StringBuilder();
buffer.append("ORC Reader(");
buffer.append(path);
if (maxLength != -1) {
buffer.append(", ");
buffer.append(maxLength);
}
buffer.append(")");
return buffer.toString();
}
@Override
public void close() throws IOException {
if (file != null) {
file.close();
}
}
/**
* Take the file from the reader.
* This allows the first RecordReader to use the same file, but additional
* RecordReaders will open new handles.
* @return a file handle, if one is available
*/
public FSDataInputStream takeFile() {
FSDataInputStream result = file;
file = null;
return result;
}
}