blob: ecbebaf994ca0183239e070b181cdcd7e92bb8b2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_EXEC_READ_WRITE_UTIL_H
#define IMPALA_EXEC_READ_WRITE_UTIL_H
#include <boost/cstdint.hpp>
#include <sstream>
#include "common/logging.h"
#include "common/status.h"
#include "util/bit-util.h"
namespace impala {
#define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return false
/// Class for reading and writing various data types.
/// Note: be very careful using *signed* ints. Casting from a signed int to
/// an unsigned is not a problem. However, bit shifts will do sign extension
/// on unsigned ints, which is rarely the right thing to do for byte level
/// operations.
class ReadWriteUtil {
public:
/// Maximum length for Writeable VInt
static const int MAX_VINT_LEN = 9;
/// Maximum lengths for Zigzag encodings.
const static int MAX_ZINT_LEN = 5;
const static int MAX_ZLONG_LEN = 10;
/// Put a zigzag encoded integer into a buffer and return its length.
static int PutZInt(int32_t integer, uint8_t* buf);
/// Put a zigzag encoded long integer into a buffer and return its length.
static int PutZLong(int64_t longint, uint8_t* buf);
/// Get a big endian integer from a buffer. The buffer does not have to be word aligned.
template<typename T>
static T GetInt(const uint8_t* buffer);
/// Get a variable-length Long or int value from a byte buffer of length size. Access
/// beyond the buffer size will return -1.
/// Returns the length of the long/int
/// If the size byte is corrupted then return -1;
static int GetVLong(uint8_t* buf, int64_t* vlong, int32_t size);
static int GetVInt(uint8_t* buf, int32_t* vint, int32_t size);
/// Writes a variable-length Long or int value to a byte buffer.
/// Returns the number of bytes written.
static int64_t PutVLong(int64_t val, uint8_t* buf);
static int64_t PutVInt(int32_t val, uint8_t* buf);
/// Returns size of the encoded long value, including the 1 byte for length.
static int VLongRequiredBytes(int64_t val);
/// Read a variable-length Long value from a byte buffer starting at the specified
/// byte offset and the buffer passed is of length size, accessing beyond the
/// buffer length will result in returning -1 value to the caller.
static int GetVLong(uint8_t* buf, int64_t offset, int64_t* vlong, int32_t size);
/// Put an Integer into a buffer in big endian order. The buffer must be big
/// enough.
static void PutInt(uint8_t* buf, uint16_t integer);
static void PutInt(uint8_t* buf, uint32_t integer);
static void PutInt(uint8_t* buf, uint64_t integer);
/// Dump the first length bytes of buf to a Hex string.
static std::string HexDump(const uint8_t* buf, int64_t length);
static std::string HexDump(const char* buf, int64_t length);
/// Determines the sign of a VInt/VLong from the first byte.
static bool IsNegativeVInt(int8_t byte);
/// Determines the total length in bytes of a Writable VInt/VLong from the first byte.
static int DecodeVIntSize(int8_t byte);
/// Return values for ReadZLong() and ReadZInt(). We return these in a single struct,
/// rather than using an output parameter, for performance (this way both values are
/// returned as registers).
template <typename T>
struct ZResult {
/// False if there was a problem reading the value.
bool ok;
/// The decoded value. Only valid if 'ok' is true.
T val;
ZResult(T v) : ok(true), val(v) { }
static ZResult error() { return ZResult(); }
private:
ZResult() : ok(false) { }
};
typedef ZResult<int64_t> ZLongResult;
typedef ZResult<int32_t> ZIntResult;
/// Read a zig-zag encoded long. This is the integer encoding defined by google.com
/// protocol-buffers: https://developers.google.com/protocol-buffers/docs/encoding. *buf
/// is incremented past the encoded long. 'buf_end' should point to the end of 'buf'
/// (i.e. the first invalid byte).
///
/// Returns a non-OK result if the encoded int spans too much many bytes. Unspecified
/// for values that have the correct number of bytes but overflow the destination type
/// (for both long and int, there are extra bits in the highest-order byte).
static inline ZLongResult ReadZLong(uint8_t** buf, uint8_t* buf_end) {
return ReadZInteger<MAX_ZLONG_LEN, ZLongResult>(buf, buf_end);
}
/// Read a zig-zag encoded int.
static inline ZIntResult ReadZInt(uint8_t** buf, uint8_t* buf_end) {
return ReadZInteger<MAX_ZINT_LEN, ZIntResult>(buf, buf_end);
}
/// The following methods read data from a buffer without assuming the buffer is long
/// enough. If the buffer isn't long enough or another error occurs, they return false
/// and update the status with the error. Otherwise they return true. buffer is advanced
/// past the data read and buf_len is decremented appropriately.
/// Read a native type T (e.g. bool, float) directly into output (i.e. input is cast
/// directly to T and incremented by sizeof(T)).
template <class T>
static bool Read(uint8_t** buf, int* buf_len, T* val, Status* status);
/// Skip the next num_bytes bytes.
static bool SkipBytes(uint8_t** buf, int* buf_len, int num_bytes, Status* status);
private:
/// Implementation for ReadZLong() and ReadZInt(). MAX_LEN is MAX_ZLONG_LEN or
/// MAX_ZINT_LEN.
template<int MAX_LEN, typename ZResult>
static ZResult ReadZInteger(uint8_t** buf, uint8_t* buf_end);
};
template<>
inline uint16_t ReadWriteUtil::GetInt(const uint8_t* buf) {
return (buf[0] << 8) | buf[1];
}
template<>
inline uint32_t ReadWriteUtil::GetInt(const uint8_t* buf) {
return (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
}
template<>
inline uint64_t ReadWriteUtil::GetInt(const uint8_t* buf) {
uint64_t upper_half = GetInt<uint32_t>(buf);
uint64_t lower_half = GetInt<uint32_t>(buf + 4);
return lower_half | upper_half << 32;
}
inline void ReadWriteUtil::PutInt(uint8_t* buf, uint16_t integer) {
buf[0] = integer >> 8;
buf[1] = integer;
}
inline void ReadWriteUtil::PutInt(uint8_t* buf, uint32_t integer) {
uint32_t big_endian = BitUtil::ByteSwap(integer);
memcpy(buf, &big_endian, sizeof(uint32_t));
}
inline void ReadWriteUtil::PutInt(uint8_t* buf, uint64_t integer) {
uint64_t big_endian = BitUtil::ByteSwap(integer);
memcpy(buf, &big_endian, sizeof(uint64_t));
}
inline int ReadWriteUtil::GetVInt(uint8_t* buf, int32_t* vint, int32_t size) {
int64_t vlong = 0;
int len = GetVLong(buf, &vlong, size);
*vint = static_cast<int32_t>(vlong);
return len;
}
inline int ReadWriteUtil::GetVLong(uint8_t* buf, int64_t* vlong, int32_t size) {
return GetVLong(buf, 0, vlong, size);
}
inline int ReadWriteUtil::GetVLong(
uint8_t* buf, int64_t offset, int64_t* vlong, int32_t size) {
// Buffer access out of bounds.
if (size == 0) return -1;
// Buffer access out of bounds.
if (offset > size) return -1;
int8_t firstbyte = (int8_t) buf[0 + offset];
int len = DecodeVIntSize(firstbyte);
// Buffer access out of bounds.
if (len > MAX_VINT_LEN || len > size) return -1;
if (len == 1) {
*vlong = static_cast<int64_t>(firstbyte);
return len;
}
*vlong &= ~*vlong;
for (int i = 1; i < len; i++) {
*vlong = (*vlong << 8) | buf[i+offset];
}
if (IsNegativeVInt(firstbyte)) {
*vlong = *vlong ^ ((int64_t) - 1);
}
return len;
}
// Returns size of the encoded long value, including the 1 byte for length for val < -112
// or val > 127.
inline int ReadWriteUtil::VLongRequiredBytes(int64_t val) {
if (val >= -112 && val <= 127) return 1;
// If 'val' is negtive, take the one's complement.
if (val < 0) val = ~val;
return 9 - __builtin_clzll(val)/8;
}
// Serializes 'val' to a binary stream with zero-compressed encoding. For -112<=val<=127,
// only one byte is used with the actual value. For other values of 'val', the first byte
// value indicates whether the long is positive or negative, and the number of bytes that
// follow. If the first byte value v is between -113 and -120, the following long is
// positive, with number of bytes that follow are -(v+112). If the first byte value v is
// between -121 and -128, the following long is negative, with number of bytes that follow
// are -(v+120). Bytes are stored in the high-non-zero-byte-first order. Returns the
// number of bytes written.
// For more information, see the documentation for 'WritableUtils.writeVLong()' method:
// https://hadoop.apache.org/docs/r2.7.2/api/org/apache/hadoop/io/WritableUtils.html
inline int64_t ReadWriteUtil::PutVLong(int64_t val, uint8_t* buf) {
int64_t num_bytes = VLongRequiredBytes(val);
if (num_bytes == 1) {
DCHECK(val >= -112 && val <= 127);
// store the value itself instead of the length
buf[0] = static_cast<int8_t>(val);
return 1;
}
// This is how we encode the length for a length less than or equal to 8
DCHECK_GE(num_bytes, 2);
DCHECK_LE(num_bytes, 9);
if (val < 0) {
DCHECK_LT(val, -112);
// The first byte in 'buf' should contain a value between -121 and -128 that makes the
// following condition true: -(buf[0] + 120) == num_bytes - 1.
// Note that 'num_bytes' includes the 1 extra byte for length.
buf[0] = -(num_bytes + 119);
// If 'val' is negtive, take the one's complement.
// See the source code for WritableUtils.writeVLong() method:
// https://hadoop.apache.org/docs/r2.7.2/api/src-html/org/apache/hadoop/io/
// WritableUtils.html#line.271
val = ~val;
} else {
DCHECK_GT(val, 127);
// The first byte in 'buf' should contain a value between -113 and -120 that makes the
// following condition true: -(buf[0] + 112) == num_bytes - 1.
// Note that 'num_bytes' includes the 1 extra byte for length.
buf[0] = -(num_bytes + 111);
}
// write to the buffer in Big Endianness
for (int i = 1; i < num_bytes; ++i) {
buf[i] = (val >> (8 * (num_bytes - i - 1))) & 0xFF;
}
return num_bytes;
}
inline int64_t ReadWriteUtil::PutVInt(int32_t val, uint8_t* buf) {
return PutVLong(val, buf);
}
template <class T>
inline bool ReadWriteUtil::Read(uint8_t** buf, int* buf_len, T* val, Status* status) {
int val_len = sizeof(T);
if (UNLIKELY(val_len > *buf_len)) {
std::stringstream ss;
ss << "Cannot read " << val_len << " bytes, buffer length is " << *buf_len;
*status = Status(ss.str());
return false;
}
*val = *reinterpret_cast<T*>(*buf);
*buf += val_len;
*buf_len -= val_len;
return true;
}
inline bool ReadWriteUtil::SkipBytes(uint8_t** buf, int* buf_len, int num_bytes,
Status* status) {
DCHECK_GE(*buf_len, 0);
if (UNLIKELY(num_bytes > *buf_len)) {
std::stringstream ss;
ss << "Cannot skip " << num_bytes << " bytes, buffer length is " << *buf_len;
*status = Status(ss.str());
return false;
}
*buf += num_bytes;
*buf_len -= num_bytes;
return true;
}
inline bool ReadWriteUtil::IsNegativeVInt(int8_t byte) {
return byte < -120 || (byte >= -112 && byte < 0);
}
inline int ReadWriteUtil::DecodeVIntSize(int8_t byte) {
if (byte >= -112) {
return 1;
} else if (byte < -120) {
return -119 - byte;
}
return -111 - byte;
}
}
#endif