blob: c9f7bb8be9bd902558b691f354fd60b3e1c92677 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#ifndef GEODE_DATAINPUT_H_
#define GEODE_DATAINPUT_H_
#include <cstring>
#include <iosfwd>
#include <memory>
#include <string>
#include <vector>
#include "ExceptionTypes.hpp"
#include "internal/DSCode.hpp"
#include "internal/geode_globals.hpp"
/**
* @file
*/
#define _GEODE_CHECK_BUFFER_SIZE(x) _checkBufferSize(x, __LINE__)
namespace apache {
namespace geode {
namespace client {
class Cache;
class CacheableString;
class DataInput;
class Serializable;
class SerializationRegistry;
class CacheImpl;
class DataInputInternal;
class Pool;
/**
* Provide operations for reading primitive data values, byte arrays,
* strings, <code>Serializable</code> objects from a byte stream.
* This class is intentionally not thread safe.
* @remarks None of the output parameters in the methods below can be nullptr
* unless otherwise noted.
*/
class APACHE_GEODE_EXPORT DataInput {
public:
/**
* Read a signed byte from the <code>DataInput</code>.
*
* @@return signed byte read from stream
*/
inline int8_t read() {
_GEODE_CHECK_BUFFER_SIZE(1);
return readNoCheck();
}
/**
* Read a boolean value from the <code>DataInput</code>.
*
* @param value output parameter to hold the boolean read from stream
*/
inline bool readBoolean() {
_GEODE_CHECK_BUFFER_SIZE(1);
return *(m_buf++) == 1 ? true : false;
}
/**
* Read the given number of unsigned bytes from the <code>DataInput</code>.
* @remarks This method is complimentary to
* <code>DataOutput::writeBytesOnly</code> and, unlike
* <code>readBytes</code>, does not expect the length of array
* in the stream.
*
* @param buffer array to hold the bytes read from stream
* @param len number of unsigned bytes to be read
*/
inline void readBytesOnly(uint8_t* buffer, size_t len) {
if (len > 0) {
_GEODE_CHECK_BUFFER_SIZE(len);
std::memcpy(buffer, m_buf, len);
m_buf += len;
}
}
/**
* Read the given number of signed bytes from the <code>DataInput</code>.
* @remarks This method is complimentary to
* <code>DataOutput::writeBytesOnly</code> and, unlike
* <code>readBytes</code>, does not expect the length of array
* in the stream.
*
* @param buffer array to hold the bytes read from stream
* @param len number of signed bytes to be read
*/
inline void readBytesOnly(int8_t* buffer, size_t len) {
if (len > 0) {
_GEODE_CHECK_BUFFER_SIZE(len);
std::memcpy(buffer, m_buf, len);
m_buf += len;
}
}
/**
* Read an array of unsigned bytes from the <code>DataInput</code>
* expecting to find the length of array in the stream at the start.
* @remarks This method is complimentary to
* <code>DataOutput::writeBytes</code>.
*
* @param bytes output array to hold the bytes read from stream; the array
* is allocated by this method
* @param len output parameter to hold the length of array read from stream
*/
inline void readBytes(uint8_t** bytes, int32_t* len) {
auto length = readArrayLength();
*len = length;
uint8_t* buffer = nullptr;
if (length > 0) {
_GEODE_CHECK_BUFFER_SIZE(length);
_GEODE_NEW(buffer, uint8_t[length]);
std::memcpy(buffer, m_buf, length);
m_buf += length;
}
*bytes = buffer;
}
/**
* Read an array of signed bytes from the <code>DataInput</code>
* expecting to find the length of array in the stream at the start.
* @remarks This method is complimentary to
* <code>DataOutput::writeBytes</code>.
*
* @param bytes output array to hold the bytes read from stream; the array
* is allocated by this method
* @param len output parameter to hold the length of array read from stream
*/
inline void readBytes(int8_t** bytes, int32_t* len) {
auto length = readArrayLength();
*len = length;
int8_t* buffer = nullptr;
if (length > 0) {
_GEODE_CHECK_BUFFER_SIZE(length);
_GEODE_NEW(buffer, int8_t[length]);
std::memcpy(buffer, m_buf, length);
m_buf += length;
}
*bytes = buffer;
}
/**
* Read a 16-bit signed integer from the <code>DataInput</code>.
*
* @return 16-bit signed integer read from stream
*/
inline int16_t readInt16() {
_GEODE_CHECK_BUFFER_SIZE(2);
return readInt16NoCheck();
}
/**
* Read a 32-bit signed integer from the <code>DataInput</code>.g
*
* @param value output parameter to hold the 32-bit signed integer
* read from stream
*/
inline int32_t readInt32() {
_GEODE_CHECK_BUFFER_SIZE(4);
int32_t tmp = *(m_buf++);
tmp = (tmp << 8) | *(m_buf++);
tmp = (tmp << 8) | *(m_buf++);
tmp = (tmp << 8) | *(m_buf++);
return tmp;
}
/**
* Read a 64-bit signed integer from the <code>DataInput</code>.
*
* @param value output parameter to hold the 64-bit signed integer
* read from stream
*/
inline int64_t readInt64() {
_GEODE_CHECK_BUFFER_SIZE(8);
int64_t tmp;
tmp = *(m_buf++);
tmp = (tmp << 8) | *(m_buf++);
tmp = (tmp << 8) | *(m_buf++);
tmp = (tmp << 8) | *(m_buf++);
tmp = (tmp << 8) | *(m_buf++);
tmp = (tmp << 8) | *(m_buf++);
tmp = (tmp << 8) | *(m_buf++);
tmp = (tmp << 8) | *(m_buf++);
return tmp;
}
/**
* Read a 32-bit signed integer array length value from the
* <code>DataInput</code> in a manner compatible with java server's
* <code>DataSerializer.readArrayLength</code>.
*
* @param len output parameter to hold the 32-bit signed length
* read from stream
*/
inline int32_t readArrayLength() {
const uint8_t code = read();
if (code == 0xFF) {
return -1;
} else {
int32_t result = code;
if (result > 252) { // 252 is java's ((byte)-4 && 0xFF)
if (code == 0xFE) {
uint16_t val = readInt16();
result = val;
} else if (code == 0xFD) {
uint32_t val = readInt32();
result = val;
} else {
throw IllegalStateException("unexpected array length code");
}
}
return result;
}
}
/**
* Decode a 64 bit integer as a variable length array.
*
* This is taken from the varint encoding in protobufs (BSD licensed).
* See https://developers.google.com/protocol-buffers/docs/encoding
*/
inline int64_t readUnsignedVL() {
int32_t shift = 0;
int64_t result = 0;
while (shift < 64) {
const auto b = read();
result |= static_cast<int64_t>(b & 0x7F) << shift;
if ((b & 0x80) == 0) {
return result;
}
shift += 7;
}
throw IllegalStateException("Malformed variable length integer");
}
/**
* Read a float from the <code>DataInput</code>.
*
* @param value output parameter to hold the float read from stream
*/
inline float readFloat() {
_GEODE_CHECK_BUFFER_SIZE(4);
union float_uint32_t {
float f;
uint32_t u;
} v;
v.u = readInt32();
return v.f;
}
/**
* Read a double precision number from the <code>DataInput</code>.
*
* @param value output parameter to hold the double precision number
* read from stream
*/
inline double readDouble() {
_GEODE_CHECK_BUFFER_SIZE(8);
union double_uint64_t {
double d;
uint64_t ll;
} v;
v.ll = readInt64();
return v.d;
}
template <class CharT = char, class... Tail>
inline std::basic_string<CharT, Tail...> readUTF() {
std::basic_string<CharT, Tail...> value;
readJavaModifiedUtf8(value);
return value;
}
template <class CharT = char, class... Tail>
inline std::basic_string<CharT, Tail...> readString() {
std::basic_string<CharT, Tail...> value;
auto type = static_cast<internal::DSCode>(read());
switch (type) {
case internal::DSCode::CacheableString:
readJavaModifiedUtf8(value);
break;
case internal::DSCode::CacheableStringHuge:
readUtf16Huge(value);
break;
case internal::DSCode::CacheableASCIIString:
readAscii(value);
break;
case internal::DSCode::CacheableASCIIStringHuge:
readAsciiHuge(value);
break;
case internal::DSCode::CacheableNullString:
// empty string
break;
// TODO: What's the right response here?
default:
break;
}
return value;
}
inline bool readNativeBool() {
read(); // ignore type id
return readBoolean();
}
inline int32_t readNativeInt32() {
read(); // ignore type id
return readInt32();
}
inline std::shared_ptr<Serializable> readDirectObject(int8_t typeId = -1) {
return readObjectInternal(typeId);
}
/**
* Read a Serializable object from the DataInput.
*
* @return Serializable object or <code>nullptr</code>.
*/
inline std::shared_ptr<Serializable> readObject() {
return readObjectInternal();
}
/**
* Read a <code>Serializable</code> object from the <code>DataInput</code>.
* Null objects are handled.
*/
inline void readObject(std::shared_ptr<Serializable>& ptr) {
ptr = readObjectInternal();
}
inline void readObject(char16_t* value) { *value = readInt16(); }
inline void readObject(bool* value) { *value = readBoolean(); }
inline void readObject(int8_t* value) { *value = read(); }
inline void readObject(int16_t* value) { *value = readInt16(); }
inline void readObject(int32_t* value) { *value = readInt32(); }
inline void readObject(int64_t* value) { *value = readInt64(); }
inline void readObject(float* value) { *value = readFloat(); }
inline void readObject(double* value) { *value = readDouble(); }
inline std::vector<char16_t> readCharArray() { return readArray<char16_t>(); }
inline std::vector<bool> readBooleanArray() { return readArray<bool>(); }
inline std::vector<int8_t> readByteArray() { return readArray<int8_t>(); }
inline std::vector<int16_t> readShortArray() { return readArray<int16_t>(); }
inline std::vector<int32_t> readIntArray() { return readArray<int32_t>(); }
inline std::vector<int64_t> readLongArray() { return readArray<int64_t>(); }
inline std::vector<float> readFloatArray() { return readArray<float>(); }
inline std::vector<double> readDoubleArray() { return readArray<double>(); }
inline std::vector<std::string> readStringArray() {
std::vector<std::string> value;
auto arrLen = readArrayLength();
if (arrLen > 0) {
value.reserve(arrLen);
for (int i = 0; i < arrLen; i++) {
value.push_back(readString());
}
}
return value;
}
inline void readArrayOfByteArrays(int8_t*** arrayofBytearr,
int32_t& arrayLength,
int32_t** elementLength) {
auto arrLen = readArrayLength();
arrayLength = arrLen;
if (arrLen == -1) {
*arrayofBytearr = nullptr;
return;
} else {
int8_t** tmpArray;
int32_t* tmpLengtharr;
_GEODE_NEW(tmpArray, int8_t * [arrLen]);
_GEODE_NEW(tmpLengtharr, int32_t[arrLen]);
for (int i = 0; i < arrLen; i++) {
readBytes(&tmpArray[i], &tmpLengtharr[i]);
}
*arrayofBytearr = tmpArray;
*elementLength = tmpLengtharr;
}
}
/**
* Get the pointer to current buffer position. This should be treated
* as readonly and modification of contents using this internal pointer
* has undefined behavior.
*/
inline const uint8_t* currentBufferPosition() const { return m_buf; }
/** get the number of bytes read in the buffer */
inline size_t getBytesRead() const { return m_buf - m_bufHead; }
/** get the number of bytes remaining to be read in the buffer */
inline size_t getBytesRemaining() const {
return (m_bufLength - getBytesRead());
}
/** advance the cursor by given offset */
inline void advanceCursor(size_t offset) { m_buf += offset; }
/** rewind the cursor by given offset */
inline void rewindCursor(size_t offset) { m_buf -= offset; }
/** reset the cursor to the start of buffer */
inline void reset() { m_buf = m_bufHead; }
inline void setBuffer() {
m_buf = currentBufferPosition();
m_bufLength = getBytesRemaining();
}
inline void resetPdx(size_t offset) { m_buf = m_bufHead + offset; }
inline size_t getPdxBytes() const { return m_bufLength; }
static uint8_t* getBufferCopy(const uint8_t* from, size_t length) {
uint8_t* result;
_GEODE_NEW(result, uint8_t[length]);
std::memcpy(result, from, length);
return result;
}
inline void reset(size_t offset) { m_buf = m_bufHead + offset; }
uint8_t* getBufferCopyFrom(const uint8_t* from, size_t length) {
uint8_t* result;
_GEODE_NEW(result, uint8_t[length]);
std::memcpy(result, from, length);
return result;
}
virtual Cache* getCache() const;
DataInput() = delete;
virtual ~DataInput() noexcept = default;
DataInput(const DataInput&) = delete;
DataInput& operator=(const DataInput&) = delete;
DataInput(DataInput&&) = default;
DataInput& operator=(DataInput&&) = default;
protected:
/** constructor given a pre-allocated byte array with size */
DataInput(const uint8_t* buffer, size_t len, const CacheImpl* cache,
Pool* pool);
virtual const SerializationRegistry& getSerializationRegistry() const;
private:
const uint8_t* m_buf;
const uint8_t* m_bufHead;
size_t m_bufLength;
Pool* m_pool;
const CacheImpl* m_cache;
std::shared_ptr<Serializable> readObjectInternal(int8_t typeId = -1);
template <typename mType>
void readObject(mType** value, int32_t& length) {
auto arrayLen = readArrayLength();
length = arrayLen;
mType* objArray;
if (arrayLen > 0) {
objArray = new mType[arrayLen];
int i = 0;
for (i = 0; i < arrayLen; i++) {
mType tmp = 0;
readObject(&tmp);
objArray[i] = tmp; //*value[i] = tmp;
}
*value = objArray;
}
}
template <typename T>
std::vector<T> readArray() {
auto arrayLen = readArrayLength();
std::vector<T> objArray;
if (arrayLen >= 0) {
objArray.reserve(arrayLen);
int i = 0;
for (i = 0; i < arrayLen; i++) {
T tmp = 0;
readObject(&tmp);
objArray.push_back(tmp);
}
}
return objArray;
}
inline char readPdxChar() { return static_cast<char>(readInt16()); }
inline void _checkBufferSize(size_t size, int32_t line) {
if ((m_bufLength - (m_buf - m_bufHead)) < size) {
throw OutOfRangeException(
"DataInput: attempt to read beyond buffer at line " +
std::to_string(line) + ": available buffer size " +
std::to_string(m_bufLength - (m_buf - m_bufHead)) +
", attempted read of size " + std::to_string(size));
}
}
inline int8_t readNoCheck() { return *(m_buf++); }
inline int16_t readInt16NoCheck() {
int16_t tmp = *(m_buf++);
tmp = static_cast<int16_t>((tmp << 8) | *(m_buf++));
return tmp;
}
template <class CharT, class... Tail>
inline void readAscii(std::basic_string<CharT, Tail...>& value,
size_t length) {
_GEODE_CHECK_BUFFER_SIZE(length);
value.reserve(length);
while (length-- > 0) {
// blindly assumes ASCII so mask off 7 bits
value += readNoCheck() & 0x7F;
}
}
template <class CharT, class... Tail>
inline void readAscii(std::basic_string<CharT, Tail...>& value) {
readAscii(value, static_cast<uint16_t>(readInt16()));
}
template <class CharT, class... Tail>
inline void readAsciiHuge(std::basic_string<CharT, Tail...>& value) {
readAscii(value, static_cast<uint32_t>(readInt32()));
}
template <class _CharT, class _Traits, class _Allocator>
void readJavaModifiedUtf8(
std::basic_string<_CharT, _Traits, _Allocator>& value);
template <class _Traits, class _Allocator>
void readJavaModifiedUtf8(
std::basic_string<char16_t, _Traits, _Allocator>& value);
template <class _Traits, class _Allocator>
void readJavaModifiedUtf8(
std::basic_string<char, _Traits, _Allocator>& value);
template <class _Traits, class _Allocator>
void readJavaModifiedUtf8(
std::basic_string<char32_t, _Traits, _Allocator>& value);
template <class _Traits, class _Allocator>
inline void readJavaModifiedUtf8(
std::basic_string<wchar_t, _Traits, _Allocator>& value) {
// TODO string optimize
typedef std::conditional<
sizeof(wchar_t) == sizeof(char16_t), char16_t,
std::conditional<sizeof(wchar_t) == sizeof(char32_t), char32_t,
char>::type>::type _WcharT;
auto tmp = std::basic_string<_WcharT>();
readJavaModifiedUtf8(tmp);
value.assign(reinterpret_cast<const wchar_t*>(tmp.data()), tmp.length());
}
template <class _CharT, class _Traits, class _Allocator>
void readUtf16Huge(std::basic_string<_CharT, _Traits, _Allocator>& value);
template <class _Traits, class _Allocator>
inline void readUtf16Huge(
std::basic_string<char16_t, _Traits, _Allocator>& value) {
uint32_t length = readInt32();
_GEODE_CHECK_BUFFER_SIZE(length);
value.reserve(length);
while (length-- > 0) {
value += readInt16NoCheck();
}
}
template <class _Traits, class _Allocator>
void readUtf16Huge(std::basic_string<char, _Traits, _Allocator>& value);
template <class _Traits, class _Allocator>
void readUtf16Huge(std::basic_string<char32_t, _Traits, _Allocator>& value);
template <class _Traits, class _Allocator>
inline void readUtf16Huge(
std::basic_string<wchar_t, _Traits, _Allocator>& value) {
// TODO string optimize
typedef std::conditional<
sizeof(wchar_t) == sizeof(char16_t), char16_t,
std::conditional<sizeof(wchar_t) == sizeof(char32_t), char32_t,
char>::type>::type _WcharT;
auto tmp = std::basic_string<_WcharT>();
readUtf16Huge(tmp);
value.assign(reinterpret_cast<const wchar_t*>(tmp.data()), tmp.length());
}
Pool* getPool() const { return m_pool; }
friend Cache;
friend CacheImpl;
friend DataInputInternal;
friend CacheableString;
};
} // namespace client
} // namespace geode
} // namespace apache
#endif // GEODE_DATAINPUT_H_