blob: 1fb59a60324ea9c7b35dba868556b04c2d759f70 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef LIB_SHARED_BUFFER_H_
#define LIB_SHARED_BUFFER_H_
#include <boost/asio.hpp>
#include <array>
#include <vector>
namespace pulsar {
namespace detail {
class SharedBufferInternal : public std::vector<char> {
public:
SharedBufferInternal(int size) : std::vector<char>(size) {}
inline char* ptr() { return &(*this)[0]; }
void resize(int newSize) { std::vector<char>::resize(newSize); }
int capacity() const { return std::vector<char>::capacity(); }
};
} // namespace detail
class SharedBuffer {
public:
explicit SharedBuffer() : data_(), ptr_(0), readIdx_(0), writeIdx_(0), capacity_(0) {}
/**
* Allocate a buffer of given size
*/
static SharedBuffer allocate(const uint32_t size) { return SharedBuffer(size); }
/**
* Create a buffer with a copy of memory pointed by ptr
*/
static SharedBuffer copy(const char* ptr, uint32_t size) {
SharedBuffer buf = allocate(size);
buf.write(ptr, size);
return buf;
}
static SharedBuffer copyFrom(const SharedBuffer& other, uint32_t capacity) {
assert(other.readableBytes() <= capacity);
SharedBuffer buf = allocate(capacity);
buf.write(other.data(), other.readableBytes());
return buf;
}
/**
* Create a buffer that wraps the passed pointer, without copying the memory
*/
static SharedBuffer wrap(char* ptr, size_t size) { return SharedBuffer(ptr, size); }
inline const char* data() const { return ptr_ + readIdx_; }
inline char* mutableData() { return ptr_ + writeIdx_; }
/**
* Return a shared buffer that include a portion of current buffer. No memory is copied
*/
SharedBuffer slice(uint32_t offset) {
SharedBuffer buf(*this);
buf.consume(offset);
return buf;
}
SharedBuffer slice(uint32_t offset, uint32_t length) {
SharedBuffer buf(*this);
buf.consume(offset);
assert(buf.readableBytes() >= length);
buf.writeIdx_ = buf.readIdx_ + length;
return buf;
}
uint32_t readUnsignedInt() {
assert(readableBytes() >= sizeof(uint32_t));
uint32_t value = ntohl(*(uint32_t*)data());
consume(sizeof(uint32_t));
return value;
}
uint16_t readUnsignedShort() {
assert(readableBytes() >= sizeof(uint16_t));
uint16_t value = ntohs(*(uint16_t*)data());
consume(sizeof(uint16_t));
return value;
}
void writeUnsignedInt(uint32_t value) {
assert(writableBytes() >= sizeof(uint32_t));
*(uint32_t*)(mutableData()) = htonl(value);
bytesWritten(sizeof(value));
}
void writeUnsignedShort(uint16_t value) {
assert(writableBytes() >= sizeof(uint16_t));
*(uint16_t*)(mutableData()) = htons(value);
bytesWritten(sizeof(value));
}
inline uint32_t readableBytes() const { return writeIdx_ - readIdx_; }
inline uint32_t writableBytes() const { return capacity_ - writeIdx_; }
inline bool readable() const { return readableBytes() > 0; }
inline bool writable() const { return writableBytes() > 0; }
boost::asio::const_buffers_1 const_asio_buffer() const {
return boost::asio::const_buffers_1(ptr_ + readIdx_, readableBytes());
}
boost::asio::mutable_buffers_1 asio_buffer() {
assert(data_);
return boost::asio::buffer(ptr_ + writeIdx_, writableBytes());
}
void write(const char* data, uint32_t size) {
assert(size <= writableBytes());
std::copy(data, data + size, mutableData());
bytesWritten(size);
}
// Mark that some bytes were written into the buffer
inline void bytesWritten(uint32_t size) {
assert(size <= writableBytes());
writeIdx_ += size;
}
// Return current writer index
uint32_t writerIndex() { return writeIdx_; }
// skip writerIndex
void skipBytes(uint32_t size) {
assert(writeIdx_ + size <= capacity_);
writeIdx_ += size;
}
// set writerIndex
void setWriterIndex(uint32_t index) {
assert(index <= capacity_);
writeIdx_ = index;
}
// Return current reader index
uint32_t readerIndex() { return readIdx_; }
// set readerIndex
void setReaderIndex(uint32_t index) {
assert(index <= capacity_);
readIdx_ = index;
}
inline void consume(uint32_t size) {
assert(size <= readableBytes());
readIdx_ += size;
}
inline void rollback(uint32_t size) {
assert(size <= readIdx_);
readIdx_ -= size;
}
inline void reset() {
readIdx_ = 0;
writeIdx_ = 0;
}
private:
typedef std::shared_ptr<detail::SharedBufferInternal> BufferPtr;
BufferPtr data_;
char* ptr_;
uint32_t readIdx_;
uint32_t writeIdx_;
uint32_t capacity_;
SharedBuffer(char* ptr, size_t size)
: data_(), ptr_(ptr), readIdx_(0), writeIdx_(size), capacity_(size) {}
explicit SharedBuffer(size_t size)
: data_(std::make_shared<detail::SharedBufferInternal>(size)),
ptr_(data_->ptr()),
readIdx_(0),
writeIdx_(0),
capacity_(size) {}
};
template <int Size>
class CompositeSharedBuffer {
public:
void set(int idx, const SharedBuffer& buffer) {
sharedBuffers_[idx] = buffer;
asioBuffers_[idx] = buffer.const_asio_buffer();
}
// Implement the ConstBufferSequence requirements.
typedef boost::asio::const_buffer value_type;
typedef boost::asio::const_buffer* iterator;
typedef const boost::asio::const_buffer* const_iterator;
const boost::asio::const_buffer* begin() const { return &(asioBuffers_.at(0)); }
const boost::asio::const_buffer* end() const { return begin() + Size; }
private:
std::array<SharedBuffer, Size> sharedBuffers_;
std::array<boost::asio::const_buffer, Size> asioBuffers_;
};
typedef CompositeSharedBuffer<2> PairSharedBuffer;
} // namespace pulsar
#endif /* LIB_SHARED_BUFFER_H_ */