blob: fc8425dbd3328d35beecf75598dcc1aff95ccc41 [file] [log] [blame]
// Modified from muduo project http://github.com/chenshuo/muduo
// @see https://github.com/chenshuo/muduo/blob/master/muduo/net/Buffer.h and https://github.com/chenshuo/muduo/blob/master/muduo/net/Buffer.cc
// Modified from evpp
// @see https://github.com/Qihoo360/evpp/blob/master/evpp/buffer.h, commit version b2535d7
#ifndef _TUBEMQ_BUFFER_H_
#define _TUBEMQ_BUFFER_H_
#include <arpa/inet.h>
#include <assert.h>
#include <stdint.h>
#include <string.h>
#include <algorithm>
#include <memory>
#include <string>
namespace tubemq {
class Buffer;
using BufferPtr = std::shared_ptr<Buffer>;
class Buffer {
public:
static const size_t kCheapPrependSize = 0;
static const size_t kInitialSize = 8192;
explicit Buffer(size_t initial_size = kInitialSize,
size_t reserved_prepend_size = kCheapPrependSize)
: capacity_(reserved_prepend_size + initial_size),
read_index_(reserved_prepend_size),
write_index_(reserved_prepend_size),
reserved_prepend_size_(reserved_prepend_size) {
buffer_ = new char[capacity_];
has_mem_ = true;
assert(length() == 0);
assert(WritableBytes() == initial_size);
assert(PrependableBytes() == reserved_prepend_size);
}
~Buffer() {
if (has_mem_) {
delete[] buffer_;
}
buffer_ = nullptr;
capacity_ = 0;
}
std::string String() {
char buf[1024];
snprintf(buf, sizeof(buf),
"buffer:%p,capacity:%ld,readindex:%ld,writeindex:%ld,prependsize:%ld,hasmem:%d",
buffer_, capacity_, read_index_, write_index_, reserved_prepend_size_, has_mem_);
return buf;
}
BufferPtr Slice() {
auto buff = std::make_shared<Buffer>(*this);
buff->has_mem_ = false;
return buff;
}
void Swap(Buffer& rhs) {
std::swap(buffer_, rhs.buffer_);
std::swap(capacity_, rhs.capacity_);
std::swap(read_index_, rhs.read_index_);
std::swap(write_index_, rhs.write_index_);
std::swap(reserved_prepend_size_, rhs.reserved_prepend_size_);
}
// Skip advances the reading index of the buffer
void Skip(size_t len) {
if (len < length()) {
read_index_ += len;
} else {
Reset();
}
}
// Retrieve advances the reading index of the buffer
// Retrieve it the same as Skip.
void Retrieve(size_t len) { Skip(len); }
// Truncate discards all but the first n unread bytes from the buffer
// but continues to use the same allocated storage.
// It does nothing if n is greater than the length of the buffer.
void Truncate(size_t n) {
if (n == 0) {
read_index_ = reserved_prepend_size_;
write_index_ = reserved_prepend_size_;
} else if (write_index_ > read_index_ + n) {
write_index_ = read_index_ + n;
}
}
// Reset resets the buffer to be empty,
// but it retains the underlying storage for use by future writes.
// Reset is the same as Truncate(0).
void Reset() { Truncate(0); }
// Increase the capacity of the container to a value that's greater
// or equal to len. If len is greater than the current capacity(),
// new storage is allocated, otherwise the method does nothing.
void Reserve(size_t len) {
if (capacity_ >= len + reserved_prepend_size_) {
return;
}
grow(len + reserved_prepend_size_);
}
// Make sure there is enough memory space to append more data with length len
void EnsureWritableBytes(size_t len) {
if (WritableBytes() < len) {
grow(len);
}
assert(WritableBytes() >= len);
}
// ToText appends char '\0' to buffer to convert the underlying data to a c-style string text.
// It will not change the length of buffer.
void ToText() {
AppendInt8('\0');
UnwriteBytes(1);
}
// Write
public:
void Write(const void* /*restrict*/ d, size_t len) {
EnsureWritableBytes(len);
memcpy(WriteBegin(), d, len);
assert(write_index_ + len <= capacity_);
write_index_ += len;
}
void Append(const char* /*restrict*/ d, size_t len) { Write(d, len); }
void Append(const void* /*restrict*/ d, size_t len) { Write(d, len); }
void AppendInt32(int32_t x) {
int32_t be32 = htonl(x);
Write(&be32, sizeof be32);
}
void AppendInt16(int16_t x) {
int16_t be16 = htons(x);
Write(&be16, sizeof be16);
}
void AppendInt8(int8_t x) { Write(&x, sizeof x); }
void PrependInt32(int32_t x) {
int32_t be32 = htonl(x);
Prepend(&be32, sizeof be32);
}
void PrependInt16(int16_t x) {
int16_t be16 = htons(x);
Prepend(&be16, sizeof be16);
}
void PrependInt8(int8_t x) { Prepend(&x, sizeof x); }
// Insert content, specified by the parameter, into the front of reading index
void Prepend(const void* /*restrict*/ d, size_t len) {
assert(len <= PrependableBytes());
read_index_ -= len;
const char* p = static_cast<const char*>(d);
memcpy(begin() + read_index_, p, len);
}
void UnwriteBytes(size_t n) {
assert(n <= length());
write_index_ -= n;
}
void WriteBytes(size_t n) {
assert(n <= WritableBytes());
write_index_ += n;
}
// Read
public:
// Peek int32_t/int16_t/int8_t with network endian
uint32_t ReadUint32() {
uint32_t result = PeekUint32();
Skip(sizeof result);
return result;
}
int32_t ReadInt32() {
int32_t result = PeekInt32();
Skip(sizeof result);
return result;
}
int16_t ReadInt16() {
int16_t result = PeekInt16();
Skip(sizeof result);
return result;
}
int8_t ReadInt8() {
int8_t result = PeekInt8();
Skip(sizeof result);
return result;
}
std::string ToString() const { return std::string(data(), length()); }
// ReadByte reads and returns the next byte from the buffer.
// If no byte is available, it returns '\0'.
char ReadByte() {
assert(length() >= 1);
if (length() == 0) {
return '\0';
}
return buffer_[read_index_++];
}
// UnreadBytes unreads the last n bytes returned
// by the most recent read operation.
void UnreadBytes(size_t n) {
assert(n < read_index_);
read_index_ -= n;
}
// Peek
public:
// Peek int64_t/int32_t/int16_t/int8_t with network endian
uint32_t PeekUint32() const {
assert(length() >= sizeof(uint32_t));
uint32_t be32 = 0;
::memcpy(&be32, data(), sizeof be32);
return ntohl(be32);
}
int32_t PeekInt32() const {
assert(length() >= sizeof(int32_t));
int32_t be32 = 0;
::memcpy(&be32, data(), sizeof be32);
return ntohl(be32);
}
int16_t PeekInt16() const {
assert(length() >= sizeof(int16_t));
int16_t be16 = 0;
::memcpy(&be16, data(), sizeof be16);
return ntohs(be16);
}
int8_t PeekInt8() const {
assert(length() >= sizeof(int8_t));
int8_t x = *data();
return x;
}
public:
// data returns a pointer of length Buffer.length() holding the unread portion of the buffer.
// The data is valid for use only until the next buffer modification (that is,
// only until the next call to a method like Read, Write, Reset, or Truncate).
// The data aliases the buffer content at least until the next buffer modification,
// so immediate changes to the slice will affect the result of future reads.
const char* data() const { return buffer_ + read_index_; }
char* WriteBegin() { return begin() + write_index_; }
const char* WriteBegin() const { return begin() + write_index_; }
// length returns the number of bytes of the unread portion of the buffer
size_t length() const {
assert(write_index_ >= read_index_);
return write_index_ - read_index_;
}
// size returns the number of bytes of the unread portion of the buffer.
// It is the same as length().
size_t size() const { return length(); }
// capacity returns the capacity of the buffer's underlying byte slice, that is, the
// total space allocated for the buffer's data.
size_t capacity() const { return capacity_; }
size_t WritableBytes() const {
assert(capacity_ >= write_index_);
return capacity_ - write_index_;
}
size_t PrependableBytes() const { return read_index_; }
public:
char* begin() { return buffer_; }
const char* begin() const { return buffer_; }
void grow(size_t len) {
if (!has_mem_) {
return;
}
if (WritableBytes() + PrependableBytes() < len + reserved_prepend_size_) {
// grow the capacity
size_t n = (capacity_ << 1) + len;
size_t m = length();
char* d = new char[n];
memcpy(d + reserved_prepend_size_, begin() + read_index_, m);
write_index_ = m + reserved_prepend_size_;
read_index_ = reserved_prepend_size_;
capacity_ = n;
delete[] buffer_;
buffer_ = d;
} else {
// move readable data to the front, make space inside buffer
assert(reserved_prepend_size_ < read_index_);
size_t readable = length();
memmove(begin() + reserved_prepend_size_, begin() + read_index_, length());
read_index_ = reserved_prepend_size_;
write_index_ = read_index_ + readable;
assert(readable == length());
assert(WritableBytes() >= len);
}
}
private:
char* buffer_;
size_t capacity_;
size_t read_index_;
size_t write_index_;
size_t reserved_prepend_size_;
bool has_mem_{true};
};
} // namespace tubemq
#endif /* _TUBEMQ_BUFFER_H_ */