blob: c8aa67b404d4e2e27971430943b27ccc42610187 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.storage.orc;
import com.google.common.collect.Lists;
import com.google.protobuf.CodedInputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.orc.*;
import org.apache.orc.Reader.Options;
import org.apache.orc.impl.BufferChunk;
import org.apache.orc.impl.InStream;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.storage.FileScanner;
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.thirdparty.orc.OrcRecordReader;
import org.apache.tajo.storage.thirdparty.orc.OrcUtils;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.TimeZone;
public class OrcScanner extends FileScanner {
private static final Log LOG = LogFactory.getLog(OrcScanner.class);
private static final int DIRECTORY_SIZE_GUESS = 16 * 1024;
protected final FileSystem fileSystem;
private final long maxLength = Long.MAX_VALUE;
protected final Path path;
protected org.apache.orc.CompressionKind compressionKind;
protected CompressionCodec codec;
protected int bufferSize;
private List<OrcProto.StripeStatistics> stripeStats;
private int metadataSize;
protected List<OrcProto.Type> types;
private List<OrcProto.UserMetadataItem> userMetadata;
private List<OrcProto.ColumnStatistics> fileStats;
private List<StripeInformation> stripes;
protected int rowIndexStride;
private long contentLength, numberOfRows;
private List<Integer> versionList;
//serialized footer - Keeping this around for use by getFileMetaInfo()
// will help avoid cpu cycles spend in deserializing at cost of increased
// memory footprint.
private ByteBuffer footerByteBuffer;
// Same for metastore cache - maintains the same background buffer, but includes postscript.
// This will only be set if the file footer/metadata was read from disk.
private ByteBuffer footerMetaAndPsBuffer;
private OrcRecordReader recordReader;
private long recordCount = 0;
/**
* Ensure this is an ORC file to prevent users from trying to read text
* files or RC files as ORC files.
* @param in the file being read
* @param path the filename for error messages
* @param psLen the postscript length
* @param buffer the tail of the file
* @throws IOException
*/
static void ensureOrcFooter(FSDataInputStream in,
Path path,
int psLen,
ByteBuffer buffer) throws IOException {
int len = OrcFile.MAGIC.length();
if (psLen < len + 1) {
throw new IOException("Malformed ORC file " + path +
". Invalid postscript length " + psLen);
}
int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - 1 - len;
byte[] array = buffer.array();
// now look for the magic string at the end of the postscript.
if (!Text.decode(array, offset, len).equals(OrcFile.MAGIC)) {
// If it isn't there, this may be the 0.11.0 version of ORC.
// Read the first 3 bytes of the file to check for the header
byte[] header = new byte[len];
in.readFully(0, header, 0, len);
// if it isn't there, this isn't an ORC file
if (!Text.decode(header, 0 , len).equals(OrcFile.MAGIC)) {
throw new IOException("Malformed ORC file " + path +
". Invalid postscript.");
}
}
}
/**
* Build a version string out of an array.
* @param version the version number as a list
* @return the human readable form of the version string
*/
private static String versionString(List<Integer> version) {
StringBuilder buffer = new StringBuilder();
for(int i=0; i < version.size(); ++i) {
if (i != 0) {
buffer.append('.');
}
buffer.append(version.get(i));
}
return buffer.toString();
}
/**
* Check to see if this ORC file is from a future version and if so,
* warn the user that we may not be able to read all of the column encodings.
* @param log the logger to write any error message to
* @param path the data source path for error messages
* @param version the version of hive that wrote the file.
*/
static void checkOrcVersion(Log log, Path path, List<Integer> version) {
if (version.size() >= 1) {
int major = version.get(0);
int minor = 0;
if (version.size() >= 2) {
minor = version.get(1);
}
if (major > OrcFile.Version.CURRENT.getMajor() ||
(major == OrcFile.Version.CURRENT.getMajor() &&
minor > OrcFile.Version.CURRENT.getMinor())) {
log.warn(path + " was written by a future Hive version " +
versionString(version) +
". This file may not be readable by this version of Hive.");
}
}
}
public OrcScanner(Configuration conf, Schema schema, TableMeta meta, Fragment fragment) throws IOException {
super(conf, schema, meta, fragment);
this.path = this.fragment.getPath();
this.fileSystem = this.path.getFileSystem(conf);
}
private static FileMetaInfo extractMetaInfoFromFooter(FileSystem fs,
Path path,
long maxFileLength
) throws IOException {
FSDataInputStream file = fs.open(path);
// figure out the size of the file using the option or filesystem
long size;
if (maxFileLength == Long.MAX_VALUE) {
size = fs.getFileStatus(path).getLen();
} else {
size = maxFileLength;
}
//read last bytes into buffer to get PostScript
int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS);
ByteBuffer buffer = ByteBuffer.allocate(readSize);
assert buffer.position() == 0;
file.readFully((size - readSize),
buffer.array(), buffer.arrayOffset(), readSize);
buffer.position(0);
//read the PostScript
//get length of PostScript
int psLen = buffer.get(readSize - 1) & 0xff;
ensureOrcFooter(file, path, psLen, buffer);
int psOffset = readSize - 1 - psLen;
OrcProto.PostScript ps = extractPostScript(buffer, path, psLen, psOffset);
int footerSize = (int) ps.getFooterLength();
int metadataSize = (int) ps.getMetadataLength();
//check if extra bytes need to be read
ByteBuffer fullFooterBuffer = null;
int extra = Math.max(0, psLen + 1 + footerSize + metadataSize - readSize);
if (extra > 0) {
//more bytes need to be read, seek back to the right place and read extra bytes
ByteBuffer extraBuf = ByteBuffer.allocate(extra + readSize);
file.readFully((size - readSize - extra), extraBuf.array(),
extraBuf.arrayOffset() + extraBuf.position(), extra);
extraBuf.position(extra);
//append with already read bytes
extraBuf.put(buffer);
buffer = extraBuf;
buffer.position(0);
fullFooterBuffer = buffer.slice();
buffer.limit(footerSize + metadataSize);
} else {
//footer is already in the bytes in buffer, just adjust position, length
buffer.position(psOffset - footerSize - metadataSize);
fullFooterBuffer = buffer.slice();
buffer.limit(psOffset);
}
// remember position for later
buffer.mark();
file.close();
return new FileMetaInfo(
ps.getCompression().toString(),
(int) ps.getCompressionBlockSize(),
(int) ps.getMetadataLength(),
buffer,
ps.getVersionList(),
org.apache.orc.OrcFile.WriterVersion.FUTURE,
fullFooterBuffer
);
}
public OrcRecordReader createRecordReader() throws IOException {
return new OrcRecordReader(this.stripes, fileSystem, schema, targets, fragment, types, codec, bufferSize,
rowIndexStride, buildReaderOptions(meta), conf,
TimeZone.getTimeZone(meta.getProperty(StorageConstants.TIMEZONE, TajoConstants.DEFAULT_SYSTEM_TIMEZONE)));
}
private static Options buildReaderOptions(TableMeta meta) {
return new Options()
.useZeroCopy(Boolean.parseBoolean(meta.getProperty(OrcConf.USE_ZEROCOPY.getAttribute(),
String.valueOf(OrcConf.USE_ZEROCOPY.getDefaultValue()))))
.skipCorruptRecords(Boolean.parseBoolean(meta.getProperty(OrcConf.SKIP_CORRUPT_DATA.getAttribute(),
String.valueOf(OrcConf.SKIP_CORRUPT_DATA.getDefaultValue()))));
}
@Override
public void init() throws IOException {
FileMetaInfo footerMetaData = extractMetaInfoFromFooter(fileSystem, path, maxLength);
this.footerMetaAndPsBuffer = footerMetaData.footerMetaAndPsBuffer;
MetaInfoObjExtractor rInfo =
new MetaInfoObjExtractor(footerMetaData.compressionType,
footerMetaData.bufferSize,
footerMetaData.metadataSize,
footerMetaData.footerBuffer
);
this.footerByteBuffer = footerMetaData.footerBuffer;
this.compressionKind = rInfo.compressionKind;
this.codec = rInfo.codec;
this.bufferSize = rInfo.bufferSize;
this.metadataSize = rInfo.metadataSize;
this.stripeStats = rInfo.metadata.getStripeStatsList();
this.types = rInfo.footer.getTypesList();
this.rowIndexStride = rInfo.footer.getRowIndexStride();
this.contentLength = rInfo.footer.getContentLength();
this.numberOfRows = rInfo.footer.getNumberOfRows();
this.userMetadata = rInfo.footer.getMetadataList();
this.fileStats = rInfo.footer.getStatisticsList();
this.versionList = footerMetaData.versionList;
this.stripes = convertProtoStripesToStripes(rInfo.footer.getStripesList());
recordReader = createRecordReader();
super.init();
}
@Override
public Tuple next() throws IOException {
Tuple next = recordReader.next();
if (next != null) {
recordCount++;
}
return next;
}
@Override
public void reset() throws IOException {
// TODO: improve this
this.close();
recordReader = createRecordReader();
}
@Override
public void close() throws IOException {
if (recordReader != null) {
recordReader.close();
tableStats.setNumBytes(recordReader.getNumBytes());
tableStats.setNumRows(recordCount);
}
}
@Override
public boolean isProjectable() {
return true;
}
@Override
public boolean isSelectable() {
return false;
}
@Override
public void setFilter(EvalNode filter) {
// TODO: implement this
}
@Override
public float getProgress() {
return inited ? recordReader.getProgress() : super.getProgress();
}
@Override
public boolean isSplittable() {
return true;
}
private static OrcProto.PostScript extractPostScript(ByteBuffer bb, Path path,
int psLen, int psAbsOffset) throws IOException {
// TODO: when PB is upgraded to 2.6, newInstance(ByteBuffer) method should be used here.
assert bb.hasArray();
CodedInputStream in = CodedInputStream.newInstance(
bb.array(), bb.arrayOffset() + psAbsOffset, psLen);
OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(in);
checkOrcVersion(LOG, path, ps.getVersionList());
// Check compression codec.
switch (ps.getCompression()) {
case NONE:
break;
case ZLIB:
break;
case SNAPPY:
break;
case LZO:
break;
default:
throw new IllegalArgumentException("Unknown compression");
}
return ps;
}
private static OrcProto.Footer extractFooter(ByteBuffer bb, int footerAbsPos,
int footerSize, CompressionCodec codec, int bufferSize) throws IOException {
bb.position(footerAbsPos);
bb.limit(footerAbsPos + footerSize);
return OrcProto.Footer.parseFrom(InStream.createCodedInputStream("footer",
Lists.newArrayList(new BufferChunk(bb, 0)), footerSize, codec, bufferSize));
}
private static OrcProto.Metadata extractMetadata(ByteBuffer bb, int metadataAbsPos,
int metadataSize, CompressionCodec codec, int bufferSize) throws IOException {
bb.position(metadataAbsPos);
bb.limit(metadataAbsPos + metadataSize);
return OrcProto.Metadata.parseFrom(InStream.createCodedInputStream("metadata",
Lists.newArrayList(new BufferChunk(bb, 0)), metadataSize, codec, bufferSize));
}
/**
* MetaInfoObjExtractor - has logic to create the values for the fields in ReaderImpl
* from serialized fields.
* As the fields are final, the fields need to be initialized in the constructor and
* can't be done in some helper function. So this helper class is used instead.
*
*/
private static class MetaInfoObjExtractor{
final org.apache.orc.CompressionKind compressionKind;
final CompressionCodec codec;
final int bufferSize;
final int metadataSize;
final OrcProto.Metadata metadata;
final OrcProto.Footer footer;
MetaInfoObjExtractor(String codecStr, int bufferSize, int metadataSize,
ByteBuffer footerBuffer) throws IOException {
this.compressionKind = org.apache.orc.CompressionKind.valueOf(codecStr);
this.bufferSize = bufferSize;
this.codec = OrcUtils.createCodec(compressionKind);
this.metadataSize = metadataSize;
int position = footerBuffer.position();
int footerBufferSize = footerBuffer.limit() - footerBuffer.position() - metadataSize;
this.metadata = extractMetadata(footerBuffer, position, metadataSize, codec, bufferSize);
this.footer = extractFooter(
footerBuffer, position + metadataSize, footerBufferSize, codec, bufferSize);
footerBuffer.position(position);
}
}
public static class StripeInformationImpl
implements org.apache.orc.StripeInformation {
private final OrcProto.StripeInformation stripe;
public StripeInformationImpl(OrcProto.StripeInformation stripe) {
this.stripe = stripe;
}
@Override
public long getOffset() {
return stripe.getOffset();
}
@Override
public long getLength() {
return stripe.getDataLength() + getIndexLength() + getFooterLength();
}
@Override
public long getDataLength() {
return stripe.getDataLength();
}
@Override
public long getFooterLength() {
return stripe.getFooterLength();
}
@Override
public long getIndexLength() {
return stripe.getIndexLength();
}
@Override
public long getNumberOfRows() {
return stripe.getNumberOfRows();
}
@Override
public String toString() {
return "offset: " + getOffset() + " data: " + getDataLength() +
" rows: " + getNumberOfRows() + " tail: " + getFooterLength() +
" index: " + getIndexLength();
}
}
private static List<StripeInformation> convertProtoStripesToStripes(
List<OrcProto.StripeInformation> stripes) {
List<StripeInformation> result = new ArrayList<>(stripes.size());
for (OrcProto.StripeInformation info : stripes) {
result.add(new StripeInformationImpl(info));
}
return result;
}
}