blob: ac280bdc47e02e64831504c8942760d58b02cd85 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.io.orc;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.Text;
import com.google.protobuf.CodedInputStream;
final class ReaderImpl implements Reader {
private static final Log LOG = LogFactory.getLog(ReaderImpl.class);
private static final int DIRECTORY_SIZE_GUESS = 16 * 1024;
private final FileSystem fileSystem;
private final Path path;
private final CompressionKind compressionKind;
private final CompressionCodec codec;
private final int bufferSize;
private final OrcProto.Footer footer;
private final ObjectInspector inspector;
private static class StripeInformationImpl
implements StripeInformation {
private final OrcProto.StripeInformation stripe;
StripeInformationImpl(OrcProto.StripeInformation stripe) {
this.stripe = stripe;
}
@Override
public long getOffset() {
return stripe.getOffset();
}
@Override
public long getDataLength() {
return stripe.getDataLength();
}
@Override
public long getFooterLength() {
return stripe.getFooterLength();
}
@Override
public long getIndexLength() {
return stripe.getIndexLength();
}
@Override
public long getNumberOfRows() {
return stripe.getNumberOfRows();
}
@Override
public String toString() {
return "offset: " + getOffset() + " data: " + getDataLength() +
" rows: " + getNumberOfRows() + " tail: " + getFooterLength() +
" index: " + getIndexLength();
}
}
@Override
public long getNumberOfRows() {
return footer.getNumberOfRows();
}
@Override
public Iterable<String> getMetadataKeys() {
List<String> result = new ArrayList<String>();
for(OrcProto.UserMetadataItem item: footer.getMetadataList()) {
result.add(item.getName());
}
return result;
}
@Override
public ByteBuffer getMetadataValue(String key) {
for(OrcProto.UserMetadataItem item: footer.getMetadataList()) {
if (item.hasName() && item.getName().equals(key)) {
return item.getValue().asReadOnlyByteBuffer();
}
}
throw new IllegalArgumentException("Can't find user metadata " + key);
}
@Override
public CompressionKind getCompression() {
return compressionKind;
}
@Override
public int getCompressionSize() {
return bufferSize;
}
@Override
public Iterable<StripeInformation> getStripes() {
return new Iterable<org.apache.hadoop.hive.ql.io.orc.StripeInformation>(){
@Override
public Iterator<org.apache.hadoop.hive.ql.io.orc.StripeInformation> iterator() {
return new Iterator<org.apache.hadoop.hive.ql.io.orc.StripeInformation>(){
private final Iterator<OrcProto.StripeInformation> inner =
footer.getStripesList().iterator();
@Override
public boolean hasNext() {
return inner.hasNext();
}
@Override
public org.apache.hadoop.hive.ql.io.orc.StripeInformation next() {
return new StripeInformationImpl(inner.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException("remove unsupported");
}
};
}
};
}
@Override
public ObjectInspector getObjectInspector() {
return inspector;
}
@Override
public long getContentLength() {
return footer.getContentLength();
}
@Override
public List<OrcProto.Type> getTypes() {
return footer.getTypesList();
}
@Override
public int getRowIndexStride() {
return footer.getRowIndexStride();
}
@Override
public ColumnStatistics[] getStatistics() {
ColumnStatistics[] result = new ColumnStatistics[footer.getTypesCount()];
for(int i=0; i < result.length; ++i) {
result[i] = ColumnStatisticsImpl.deserialize(footer.getStatistics(i));
}
return result;
}
/**
* Ensure this is an ORC file to prevent users from trying to read text
* files or RC files as ORC files.
* @param in the file being read
* @param path the filename for error messages
* @param psLen the postscript length
* @param buffer the tail of the file
* @throws IOException
*/
static void ensureOrcFooter(FSDataInputStream in,
Path path,
int psLen,
ByteBuffer buffer) throws IOException {
int len = OrcFile.MAGIC.length();
if (psLen < len + 1) {
throw new IOException("Malformed ORC file " + path +
". Invalid postscript length " + psLen);
}
int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - 1
- len;
byte[] array = buffer.array();
// now look for the magic string at the end of the postscript.
if (!Text.decode(array, offset, len).equals(OrcFile.MAGIC)) {
// If it isn't there, this may be the 0.11.0 version of ORC.
// Read the first 3 bytes of the file to check for the header
in.seek(0);
byte[] header = new byte[len];
in.readFully(header, 0, len);
// if it isn't there, this isn't an ORC file
if (!Text.decode(header, 0 , len).equals(OrcFile.MAGIC)) {
throw new IOException("Malformed ORC file " + path +
". Invalid postscript.");
}
}
}
/**
* Build a version string out of an array.
* @param version the version number as a list
* @return the human readable form of the version string
*/
private static String versionString(List<Integer> version) {
StringBuilder buffer = new StringBuilder();
for(int i=0; i < version.size(); ++i) {
if (i != 0) {
buffer.append('.');
}
buffer.append(version.get(i));
}
return buffer.toString();
}
/**
* Check to see if this ORC file is from a future version and if so,
* warn the user that we may not be able to read all of the column encodings.
* @param log the logger to write any error message to
* @param path the filename for error messages
* @param version the version of hive that wrote the file.
*/
static void checkOrcVersion(Log log, Path path, List<Integer> version) {
if (version.size() >= 1) {
int major = version.get(0);
int minor = 0;
if (version.size() >= 2) {
minor = version.get(1);
}
if (major > OrcFile.Version.CURRENT.getMajor() ||
(major == OrcFile.Version.CURRENT.getMajor() &&
minor > OrcFile.Version.CURRENT.getMinor())) {
log.warn("ORC file " + path +
" was written by a future Hive version " +
versionString(version) +
". This file may not be readable by this version of Hive.");
}
}
}
ReaderImpl(FileSystem fs, Path path) throws IOException {
this.fileSystem = fs;
this.path = path;
FSDataInputStream file = fs.open(path);
long size = fs.getFileStatus(path).getLen();
int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS);
file.seek(size - readSize);
ByteBuffer buffer = ByteBuffer.allocate(readSize);
file.readFully(buffer.array(), buffer.arrayOffset() + buffer.position(),
buffer.remaining());
int psLen = buffer.get(readSize - 1);
ensureOrcFooter(file, path, psLen, buffer);
int psOffset = readSize - 1 - psLen;
CodedInputStream in = CodedInputStream.newInstance(buffer.array(),
buffer.arrayOffset() + psOffset, psLen);
OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(in);
checkOrcVersion(LOG, path, ps.getVersionList());
int footerSize = (int) ps.getFooterLength();
bufferSize = (int) ps.getCompressionBlockSize();
switch (ps.getCompression()) {
case NONE:
compressionKind = CompressionKind.NONE;
break;
case ZLIB:
compressionKind = CompressionKind.ZLIB;
break;
case SNAPPY:
compressionKind = CompressionKind.SNAPPY;
break;
case LZO:
compressionKind = CompressionKind.LZO;
break;
default:
throw new IllegalArgumentException("Unknown compression");
}
codec = WriterImpl.createCodec(compressionKind);
int extra = Math.max(0, psLen + 1 + footerSize - readSize);
if (extra > 0) {
file.seek(size - readSize - extra);
ByteBuffer extraBuf = ByteBuffer.allocate(extra + readSize);
file.readFully(extraBuf.array(),
extraBuf.arrayOffset() + extraBuf.position(), extra);
extraBuf.position(extra);
extraBuf.put(buffer);
buffer = extraBuf;
buffer.position(0);
buffer.limit(footerSize);
} else {
buffer.position(psOffset - footerSize);
buffer.limit(psOffset);
}
InputStream instream = InStream.create("footer", new ByteBuffer[]{buffer},
new long[]{0L}, footerSize, codec, bufferSize);
footer = OrcProto.Footer.parseFrom(instream);
inspector = OrcStruct.createObjectInspector(0, footer.getTypesList());
file.close();
}
@Override
public RecordReader rows(boolean[] include) throws IOException {
return rows(0, Long.MAX_VALUE, include, null, null);
}
@Override
public RecordReader rows(long offset, long length, boolean[] include
) throws IOException {
return rows(offset, length, include, null, null);
}
@Override
public RecordReader rows(long offset, long length, boolean[] include,
SearchArgument sarg, String[] columnNames
) throws IOException {
// if included columns is null, then include all columns
if (include == null) {
include = new boolean[footer.getTypesCount()];
Arrays.fill(include, true);
}
return new RecordReaderImpl(this.getStripes(), fileSystem, path, offset,
length, footer.getTypesList(), codec, bufferSize,
include, footer.getRowIndexStride(), sarg, columnNames);
}
}