blob: 5c6f0490340d57fe666c5a36e40c35baae53b3a4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_EXEC_SCANNER_CONTEXT_INLINE_H
#define IMPALA_EXEC_SCANNER_CONTEXT_INLINE_H
#include "exec/scanner-context.h"
#include "exec/read-write-util.h"
#include "runtime/string-buffer.h"
namespace impala {
/// Macro to return false if condition is false. Only defined for this file.
#define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return false
/// Handle the fast common path where all the bytes are in the first buffer. This
/// is the path used by sequence/rc/parquet file formats to read a very small number
/// (i.e. single int) of bytes.
inline bool ScannerContext::Stream::GetBytes(int64_t requested_len, uint8_t** buffer,
int64_t* out_len, Status* status, bool peek) {
if (UNLIKELY(requested_len < 0)) {
*status = ReportInvalidRead(requested_len);
return false;
}
if (UNLIKELY(requested_len == 0)) {
*out_len = 0;
return true;
}
if (LIKELY(requested_len <= *output_buffer_bytes_left_)) {
*out_len = requested_len;
*buffer = *output_buffer_pos_;
if (LIKELY(!peek)) {
total_bytes_returned_ += *out_len;
*output_buffer_pos_ += *out_len;
*output_buffer_bytes_left_ -= *out_len;
}
return true;
}
DCHECK_GT(requested_len, 0);
*status = GetBytesInternal(requested_len, buffer, peek, out_len);
return status->ok();
}
inline bool ScannerContext::Stream::ReadBytes(int64_t length, uint8_t** buf,
Status* status, bool peek) {
int64_t bytes_read;
RETURN_IF_FALSE(GetBytes(length, buf, &bytes_read, status, peek));
if (UNLIKELY(length != bytes_read)) {
DCHECK_LT(bytes_read, length);
*status = ReportIncompleteRead(length, bytes_read);
return false;
}
return true;
}
/// TODO: consider implementing a Skip in the context/stream object that's more
/// efficient than GetBytes.
inline bool ScannerContext::Stream::SkipBytes(int64_t length, Status* status) {
uint8_t* dummy_buf;
int64_t bytes_read;
RETURN_IF_FALSE(GetBytes(length, &dummy_buf, &bytes_read, status));
if (UNLIKELY(length != bytes_read)) {
DCHECK_LT(bytes_read, length);
*status = ReportIncompleteRead(length, bytes_read);
return false;
}
return true;
}
inline bool ScannerContext::Stream::SkipText(Status* status) {
uint8_t* dummy_buffer;
int64_t bytes_read;
return ReadText(&dummy_buffer, &bytes_read, status);
}
inline bool ScannerContext::Stream::ReadText(uint8_t** buf, int64_t* len,
Status* status) {
RETURN_IF_FALSE(ReadVLong(len, status));
RETURN_IF_FALSE(ReadBytes(*len, buf, status));
return true;
}
inline bool ScannerContext::Stream::ReadBoolean(bool* b, Status* status) {
uint8_t* val;
RETURN_IF_FALSE(ReadBytes(1, &val, status));
*b = (*val != 0);
return true;
}
inline bool ScannerContext::Stream::ReadInt(int32_t* val, Status* status, bool peek) {
uint8_t* bytes;
RETURN_IF_FALSE(ReadBytes(sizeof(uint32_t), &bytes, status, peek));
*val = ReadWriteUtil::GetInt<uint32_t>(bytes);
return true;
}
inline bool ScannerContext::Stream::ReadVInt(int32_t* value, Status* status) {
int64_t vlong;
RETURN_IF_FALSE(ReadVLong(&vlong, status));
*value = static_cast<int32_t>(vlong);
return true;
}
inline bool ScannerContext::Stream::ReadVLong(int64_t* value, Status* status) {
int8_t* firstbyte;
uint8_t* bytes;
RETURN_IF_FALSE(ReadBytes(1, reinterpret_cast<uint8_t**>(&firstbyte), status));
int len = ReadWriteUtil::DecodeVIntSize(*firstbyte);
if (len > ReadWriteUtil::MAX_VINT_LEN) {
*status = Status("ReadVLong: size is too big");
return false;
}
if (len == 1) {
*value = static_cast<int64_t>(*firstbyte);
return true;
}
--len;
RETURN_IF_FALSE(ReadBytes(len, &bytes, status));
*value = 0;
for (int i = 0; i < len; i++) {
*value = (*value << 8) | (bytes[i] & 0xFF);
}
if (ReadWriteUtil::IsNegativeVInt(*firstbyte)) {
*value = *value ^ (static_cast<int64_t>(-1));
}
return true;
}
inline bool ScannerContext::Stream::ReadZLong(int64_t* value, Status* status) {
uint8_t* bytes;
int64_t bytes_len;
RETURN_IF_FALSE(
GetBytes(ReadWriteUtil::MAX_ZLONG_LEN, &bytes, &bytes_len, status, true));
uint8_t* new_bytes = bytes;
ReadWriteUtil::ZLongResult r = ReadWriteUtil::ReadZLong(&new_bytes, bytes + bytes_len);
if (UNLIKELY(!r.ok)) {
*status = ReportInvalidInt();
return false;
}
*value = r.val;
int64_t bytes_read = new_bytes - bytes;
RETURN_IF_FALSE(SkipBytes(bytes_read, status));
return true;
}
#undef RETURN_IF_FALSE
} // namespace impala
#endif