blob: b63f19d28e068c4c2264a06882a44c8a70caddc6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "orc/Exceptions.hh"
#include "RLE.hh"
#include "Reader.hh"
#include "StripeStream.hh"
#include "wrap/coded-stream-wrapper.h"
namespace orc {
StripeStreamsImpl::StripeStreamsImpl(const RowReaderImpl& _reader, uint64_t _index,
const proto::StripeInformation& _stripeInfo,
const proto::StripeFooter& _footer,
uint64_t _stripeStart,
InputStream& _input,
const Timezone& _writerTimezone
): reader(_reader),
stripeInfo(_stripeInfo),
footer(_footer),
stripeIndex(_index),
stripeStart(_stripeStart),
input(_input),
writerTimezone(_writerTimezone) {
// PASS
}
StripeStreamsImpl::~StripeStreamsImpl() {
// PASS
}
StreamInformation::~StreamInformation() {
// PASS
}
StripeInformation::~StripeInformation() {
// PASS
}
StreamInformationImpl::~StreamInformationImpl() {
// PASS
}
const std::vector<bool> StripeStreamsImpl::getSelectedColumns() const {
return reader.getSelectedColumns();
}
proto::ColumnEncoding StripeStreamsImpl::getEncoding(uint64_t columnId
) const {
return footer.columns(static_cast<int>(columnId));
}
const Timezone& StripeStreamsImpl::getWriterTimezone() const {
return writerTimezone;
}
std::ostream* StripeStreamsImpl::getErrorStream() const {
return reader.getFileContents().errorStream;
}
std::unique_ptr<SeekableInputStream>
StripeStreamsImpl::getStream(uint64_t columnId,
proto::Stream_Kind kind,
bool shouldStream) const {
uint64_t offset = stripeStart;
uint64_t dataEnd = stripeInfo.offset() + stripeInfo.indexlength() + stripeInfo.datalength();
MemoryPool *pool = reader.getFileContents().pool;
for(int i = 0; i < footer.streams_size(); ++i) {
const proto::Stream& stream = footer.streams(i);
if (stream.has_kind() &&
stream.kind() == kind &&
stream.column() == static_cast<uint64_t>(columnId)) {
uint64_t streamLength = stream.length();
uint64_t myBlock = shouldStream ? input.getNaturalReadSize(): streamLength;
if (offset + streamLength > dataEnd) {
std::stringstream msg;
msg << "Malformed stream meta at stream index " << i << " in stripe " << stripeIndex
<< ": streamOffset=" << offset << ", streamLength=" << streamLength
<< ", stripeOffset=" << stripeInfo.offset() << ", stripeIndexLength="
<< stripeInfo.indexlength() << ", stripeDataLength=" << stripeInfo.datalength();
throw ParseError(msg.str());
}
return createDecompressor(reader.getCompression(),
std::unique_ptr<SeekableInputStream>
(new SeekableFileInputStream
(&input,
offset,
stream.length(),
*pool,
myBlock)),
reader.getCompressionSize(),
*pool);
}
offset += stream.length();
}
return std::unique_ptr<SeekableInputStream>();
}
MemoryPool& StripeStreamsImpl::getMemoryPool() const {
return *reader.getFileContents().pool;
}
bool StripeStreamsImpl::getThrowOnHive11DecimalOverflow() const {
return reader.getThrowOnHive11DecimalOverflow();
}
int32_t StripeStreamsImpl::getForcedScaleOnHive11Decimal() const {
return reader.getForcedScaleOnHive11Decimal();
}
void StripeInformationImpl::ensureStripeFooterLoaded() const {
if (stripeFooter.get() == nullptr) {
std::unique_ptr<SeekableInputStream> pbStream =
createDecompressor(compression,
std::unique_ptr<SeekableInputStream>
(new SeekableFileInputStream(stream,
offset +
indexLength +
dataLength,
footerLength,
memory)),
blockSize,
memory);
stripeFooter.reset(new proto::StripeFooter());
if (!stripeFooter->ParseFromZeroCopyStream(pbStream.get())) {
throw ParseError("Failed to parse the stripe footer");
}
}
}
std::unique_ptr<StreamInformation>
StripeInformationImpl::getStreamInformation(uint64_t streamId) const {
ensureStripeFooterLoaded();
uint64_t streamOffset = offset;
for(uint64_t s=0; s < streamId; ++s) {
streamOffset += stripeFooter->streams(static_cast<int>(s)).length();
}
return ORC_UNIQUE_PTR<StreamInformation>
(new StreamInformationImpl(streamOffset,
stripeFooter->
streams(static_cast<int>(streamId))));
}
}