blob: b3ff372bfc6ba9c24aa6e39c686ed1891e777881 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_EXEC_SCANNER_CONTEXT_INLINE_H
#define IMPALA_EXEC_SCANNER_CONTEXT_INLINE_H
#include "exec/scanner-context.h"
#include "exec/read-write-util.h"
#include "runtime/string-buffer.h"
namespace impala {
/// Macro to return false if condition is false. Only defined for this file.
#define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return false
/// Handle the fast common path where all the bytes are in the first buffer. This
/// is the path used by sequence/rc/parquet file formats to read a very small number
/// (i.e. single int) of bytes.
inline bool ScannerContext::Stream::GetBytes(int64_t requested_len, uint8_t** buffer,
int64_t* out_len, Status* status, bool peek) {
if (UNLIKELY(requested_len < 0)) {
*status = ReportInvalidRead(requested_len);
return false;
}
if (UNLIKELY(requested_len == 0)) {
*out_len = 0;
return true;
}
if (LIKELY(requested_len <= *output_buffer_bytes_left_)) {
*out_len = requested_len;
*buffer = *output_buffer_pos_;
if (LIKELY(!peek)) {
total_bytes_returned_ += *out_len;
AdvanceBufferPos(*out_len, output_buffer_pos_, output_buffer_bytes_left_);
}
return true;
}
DCHECK_GT(requested_len, 0);
*status = GetBytesInternal(requested_len, buffer, peek, out_len);
return status->ok();
}
inline bool ScannerContext::Stream::ReadBytes(int64_t length, uint8_t** buf,
Status* status, bool peek) {
int64_t bytes_read;
RETURN_IF_FALSE(GetBytes(length, buf, &bytes_read, status, peek));
if (UNLIKELY(length != bytes_read)) {
DCHECK_LT(bytes_read, length);
*status = ReportIncompleteRead(length, bytes_read);
return false;
}
return true;
}
inline bool ScannerContext::Stream::SkipBytes(int64_t length, Status* status) {
DCHECK_GE(length, 0);
int64_t bytes_left = length;
// Skip bytes from the boundary buffer first.
if (boundary_buffer_bytes_left_ > 0) {
DCHECK_EQ(output_buffer_pos_, &boundary_buffer_pos_);
DCHECK_EQ(output_buffer_bytes_left_, &boundary_buffer_bytes_left_);
int64_t boundary_buffer_bytes_to_skip =
std::min(bytes_left, boundary_buffer_bytes_left_);
AdvanceBufferPos(boundary_buffer_bytes_to_skip, &boundary_buffer_pos_,
&boundary_buffer_bytes_left_);
bytes_left -= boundary_buffer_bytes_to_skip;
total_bytes_returned_ += boundary_buffer_bytes_to_skip;
if (boundary_buffer_bytes_left_ == 0 && io_buffer_bytes_left_ > 0) {
output_buffer_pos_ = &io_buffer_pos_;
output_buffer_bytes_left_ = &io_buffer_bytes_left_;
}
if (bytes_left == 0) return true;
}
// Skip bytes from the I/O buffer second.
if (io_buffer_bytes_left_ > 0) {
int64_t io_buffer_bytes_to_skip = std::min(bytes_left, io_buffer_bytes_left_);
AdvanceBufferPos(io_buffer_bytes_to_skip, &io_buffer_pos_, &io_buffer_bytes_left_);
bytes_left -= io_buffer_bytes_to_skip;
total_bytes_returned_ += io_buffer_bytes_to_skip;
if (bytes_left == 0) return true;
}
DCHECK(ValidateBufferPointers());
DCHECK_GT(bytes_left, 0);
// Slow path: need to skip data in subsequent buffers.
return SkipBytesInternal(length, bytes_left, status);
}
inline bool ScannerContext::Stream::SkipText(Status* status) {
int64_t len;
RETURN_IF_FALSE(ReadVLong(&len, status));
if (len < 0) {
*status = Status("SkipText: length is negative");
return false;
}
RETURN_IF_FALSE(SkipBytes(len, status));
return true;
}
inline bool ScannerContext::Stream::ReadText(uint8_t** buf, int64_t* len,
Status* status) {
RETURN_IF_FALSE(ReadVLong(len, status));
RETURN_IF_FALSE(ReadBytes(*len, buf, status));
return true;
}
inline bool ScannerContext::Stream::ReadBoolean(bool* b, Status* status) {
uint8_t* val;
RETURN_IF_FALSE(ReadBytes(1, &val, status));
*b = (*val != 0);
return true;
}
inline bool ScannerContext::Stream::ReadInt(int32_t* val, Status* status, bool peek) {
uint8_t* bytes;
RETURN_IF_FALSE(ReadBytes(sizeof(uint32_t), &bytes, status, peek));
*val = ReadWriteUtil::GetInt<uint32_t>(bytes);
return true;
}
inline bool ScannerContext::Stream::ReadVInt(int32_t* value, Status* status) {
int64_t vlong;
RETURN_IF_FALSE(ReadVLong(&vlong, status));
*value = static_cast<int32_t>(vlong);
return true;
}
inline bool ScannerContext::Stream::ReadVLong(int64_t* value, Status* status) {
int8_t* firstbyte;
uint8_t* bytes;
RETURN_IF_FALSE(ReadBytes(1, reinterpret_cast<uint8_t**>(&firstbyte), status));
int len = ReadWriteUtil::DecodeVIntSize(*firstbyte);
bool is_negative = ReadWriteUtil::IsNegativeVInt(*firstbyte);
if (len > ReadWriteUtil::MAX_VINT_LEN) {
*status = Status("ReadVLong: size is too big");
return false;
}
if (len == 1) {
*value = static_cast<int64_t>(*firstbyte);
return true;
}
--len;
RETURN_IF_FALSE(ReadBytes(len, &bytes, status));
*value = 0;
for (int i = 0; i < len; i++) {
*value = (*value << 8) | (bytes[i] & 0xFF);
}
if (is_negative) *value = *value ^ (static_cast<int64_t>(-1));
return true;
}
inline bool ScannerContext::Stream::ReadZLong(int64_t* value, Status* status) {
uint8_t* bytes;
int64_t bytes_len;
RETURN_IF_FALSE(
GetBytes(ReadWriteUtil::MAX_ZLONG_LEN, &bytes, &bytes_len, status, true));
uint8_t* new_bytes = bytes;
ReadWriteUtil::ZLongResult r = ReadWriteUtil::ReadZLong(&new_bytes, bytes + bytes_len);
if (UNLIKELY(!r.ok)) {
*status = ReportInvalidInt();
return false;
}
*value = r.val;
int64_t bytes_read = new_bytes - bytes;
RETURN_IF_FALSE(SkipBytes(bytes_read, status));
return true;
}
inline void ScannerContext::Stream::AdvanceBufferPos(int64_t bytes,
uint8_t** buffer_pos, int64_t* buffer_bytes_left) {
DCHECK_LE(bytes, *buffer_bytes_left);
*buffer_pos += bytes;
*buffer_bytes_left -= bytes;
}
#undef RETURN_IF_FALSE
} // namespace impala
#endif