| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * https://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #ifndef avro_BufferDetail_hh__ |
| #define avro_BufferDetail_hh__ |
| |
| #include <boost/function.hpp> |
| #include <boost/shared_array.hpp> |
| #include <boost/shared_ptr.hpp> |
| #include <boost/static_assert.hpp> |
| #include <boost/utility.hpp> |
| #include <utility> |
| #ifdef HAVE_BOOST_ASIO |
| #include <boost/asio/buffer.hpp> |
| #endif |
| #include <cassert> |
| #include <deque> |
| #include <exception> |
| |
| /** |
| * \file BufferDetail.hh |
| * |
| * \brief The implementation details for the Buffer class. |
| * |
| **/ |
| |
| namespace avro { |
| |
| namespace detail { |
| |
| typedef char data_type; |
| typedef size_t size_type; |
| #ifdef HAVE_BOOST_ASIO |
| typedef boost::asio::const_buffer ConstAsioBuffer; |
| typedef boost::asio::mutable_buffer MutableAsioBuffer; |
| #endif |
| |
| /// The size in bytes for blocks backing buffer chunks. |
| const size_type kMinBlockSize = 4096; |
| const size_type kMaxBlockSize = 16384; |
| const size_type kDefaultBlockSize = kMinBlockSize; |
| |
| typedef boost::function<void(void)> free_func; |
| |
| /** |
| * Simple class to hold a functor that executes on delete |
| **/ |
| class CallOnDestroy { |
| public: |
| explicit CallOnDestroy(free_func func) : func_(std::move(func)) {} |
| ~CallOnDestroy() { |
| if (func_) { |
| func_(); |
| } |
| } |
| |
| private: |
| free_func func_; |
| }; |
| |
| /** |
| * \brief A chunk is the building block for buffers. |
| * |
| * A chunk is backed by a memory block, and internally it maintains information |
| * about which area of the block it may use, and the portion of this area that |
| * contains valid data. More than one chunk may share the same underlying |
| * block, but the areas should never overlap. Chunk holds a shared pointer to |
| * an array of bytes so that shared blocks are reference counted. |
| * |
| * When a chunk is copied, the copy shares the same underlying buffer, but the |
| * copy receives its own copies of the start/cursor/end pointers, so each copy |
| * can be manipulated independently. This allows different buffers to share |
| * the same non-overlapping parts of a chunk, or even overlapping parts of a |
| * chunk if the situation arises. |
| * |
| **/ |
| |
| class Chunk { |
| |
| public: |
| /// Default constructor, allocates a new underlying block for this chunk. |
| explicit Chunk(size_type size) : underlyingBlock_(new data_type[size]), |
| readPos_(underlyingBlock_.get()), |
| writePos_(readPos_), |
| endPos_(readPos_ + size) {} |
| |
| /// Foreign buffer constructor, uses the supplied data for this chunk, and |
| /// only for reading. |
| Chunk(const data_type *data, size_type size, const free_func &func) : callOnDestroy_(new CallOnDestroy(func)), |
| readPos_(const_cast<data_type *>(data)), |
| writePos_(readPos_ + size), |
| endPos_(writePos_) {} |
| |
| private: |
| // reference counted object will call a functor when it's destroyed |
| boost::shared_ptr<CallOnDestroy> callOnDestroy_; |
| |
| public: |
| /// Remove readable bytes from the front of the chunk by advancing the |
| /// chunk start position. |
| void truncateFront(size_type howMuch) { |
| readPos_ += howMuch; |
| assert(readPos_ <= writePos_); |
| } |
| |
| /// Remove readable bytes from the back of the chunk by moving the |
| /// chunk cursor position. |
| void truncateBack(size_type howMuch) { |
| writePos_ -= howMuch; |
| assert(readPos_ <= writePos_); |
| } |
| |
| /// Tell the position the next byte may be written to. |
| data_type *tellWritePos() const { |
| return writePos_; |
| } |
| |
| /// Tell the position of the first byte containing valid data. |
| const data_type *tellReadPos() const { |
| return readPos_; |
| } |
| |
| /// After a write operation, increment the write position. |
| void incrementCursor(size_type howMuch) { |
| writePos_ += howMuch; |
| assert(writePos_ <= endPos_); |
| } |
| |
| /// Tell how many bytes of data were written to this chunk. |
| size_type dataSize() const { |
| return (writePos_ - readPos_); |
| } |
| |
| /// Tell how many bytes this chunk has available to write to. |
| size_type freeSize() const { |
| return (endPos_ - writePos_); |
| } |
| |
| /// Tell how many bytes of data this chunk can hold (used and free). |
| size_type capacity() const { |
| return (endPos_ - readPos_); |
| } |
| |
| private: |
| friend bool operator==(const Chunk &lhs, const Chunk &rhs); |
| friend bool operator!=(const Chunk &lhs, const Chunk &rhs); |
| |
| // more than one buffer can share an underlying block, so use SharedPtr |
| boost::shared_array<data_type> underlyingBlock_; |
| |
| data_type *readPos_; ///< The first readable byte in the block |
| data_type *writePos_; ///< The end of written data and start of free space |
| data_type *endPos_; ///< Marks the end of the usable block area |
| }; |
| |
| /** |
| * Compare underlying buffers and return true if they are equal |
| **/ |
| inline bool operator==(const Chunk &lhs, const Chunk &rhs) { |
| return lhs.underlyingBlock_ == rhs.underlyingBlock_; |
| } |
| |
| /** |
| * Compare underlying buffers and return true if they are unequal |
| **/ |
| inline bool operator!=(const Chunk &lhs, const Chunk &rhs) { |
| return lhs.underlyingBlock_ != rhs.underlyingBlock_; |
| } |
| |
| /** |
| * \brief Implementation details for Buffer class |
| * |
| * Internally, BufferImpl keeps two lists of chunks, one list consists entirely of |
| * chunks containing data, and one list which contains chunks with free space. |
| * |
| * |
| */ |
| |
| class BufferImpl : boost::noncopyable { |
| |
| /// Add a new chunk to the list of chunks for this buffer, growing the |
| /// buffer by the default block size. |
| void allocChunkChecked(size_type size = kDefaultBlockSize) { |
| writeChunks_.push_back(Chunk(size)); |
| freeSpace_ += writeChunks_.back().freeSize(); |
| } |
| |
| /// Add a new chunk to the list of chunks for this buffer, growing the |
| /// buffer by the requested size, but within the range of a minimum and |
| /// maximum. |
| void allocChunk(size_type size) { |
| if (size < kMinBlockSize) { |
| size = kMinBlockSize; |
| } else if (size > kMaxBlockSize) { |
| size = kMaxBlockSize; |
| } |
| allocChunkChecked(size); |
| } |
| |
| /// Update the state of the chunks after a write operation. This function |
| /// ensures the chunk states are consistent with the write. |
| void postWrite(size_type size) { |
| |
| // precondition to this function is that the writeChunk_.front() |
| // contains the data that was just written, so make sure writeChunks_ |
| // is not empty: |
| |
| assert(size <= freeSpace_ && !writeChunks_.empty()); |
| |
| // This is probably the one tricky part of BufferImpl. The data that |
| // was written now exists in writeChunks_.front(). Now we must make |
| // sure that same data exists in readChunks_.back(). |
| // |
| // There are two cases: |
| // |
| // 1. readChunks_.last() and writeChunk_.front() refer to the same |
| // underlying block, in which case they both just need their cursor |
| // updated to reflect the new state. |
| // |
| // 2. readChunk_.last() is not the same block as writeChunks_.front(), |
| // in which case it should be, since the writeChunk.front() contains |
| // the next bit of data that will be appended to readChunks_, and |
| // therefore needs to be copied there so we can proceed with updating |
| // their state. |
| // |
| |
| // if readChunks_ is not the same as writeChunks_.front(), make a copy |
| // of it there |
| |
| if (readChunks_.empty() || (readChunks_.back() != writeChunks_.front())) { |
| const Chunk &curChunk = writeChunks_.front(); |
| readChunks_.push_back(curChunk); |
| |
| // Any data that existed in the write chunk previously doesn't |
| // belong to this buffer (otherwise it would have already been |
| // added to the readChunk_ list). Here, adjust the start of the |
| // readChunk to begin after any data already existing in curChunk |
| |
| readChunks_.back().truncateFront(curChunk.dataSize()); |
| } |
| |
| assert(readChunks_.back().freeSize() == writeChunks_.front().freeSize()); |
| |
| // update the states of both readChunks_ and writeChunks_ to indicate that they are |
| // holding the new data |
| |
| readChunks_.back().incrementCursor(size); |
| writeChunks_.front().incrementCursor(size); |
| size_ += size; |
| freeSpace_ -= size; |
| |
| // if there is no more free space in writeChunks_, the next write cannot use |
| // it, so dispose of it now |
| |
| if (writeChunks_.front().freeSize() == 0) { |
| writeChunks_.pop_front(); |
| } |
| } |
| |
| public: |
| typedef std::deque<Chunk> ChunkList; |
| typedef boost::shared_ptr<BufferImpl> SharedPtr; |
| typedef boost::shared_ptr<const BufferImpl> ConstSharedPtr; |
| |
| /// Default constructor, creates a buffer without any chunks |
| BufferImpl() : freeSpace_(0), |
| size_(0) {} |
| |
| /// Copy constructor, gets a copy of all the chunks with data. |
| BufferImpl(const BufferImpl &src) : readChunks_(src.readChunks_), |
| freeSpace_(0), |
| size_(src.size_) {} |
| |
| /// Amount of data held in this buffer. |
| size_type size() const { |
| return size_; |
| } |
| |
| /// Capacity that may be written before the buffer must allocate more memory. |
| size_type freeSpace() const { |
| return freeSpace_; |
| } |
| |
| /// Add enough free chunks to make the reservation size available. |
| /// Actual amount may be more (rounded up to next chunk). |
| void reserveFreeSpace(size_type reserveSize) { |
| while (freeSpace_ < reserveSize) { |
| allocChunk(reserveSize - freeSpace_); |
| } |
| } |
| |
| /// Return the chunk avro's begin iterator for reading. |
| ChunkList::const_iterator beginRead() const { |
| return readChunks_.begin(); |
| } |
| |
| /// Return the chunk avro's end iterator for reading. |
| ChunkList::const_iterator endRead() const { |
| return readChunks_.end(); |
| } |
| |
| /// Return the chunk avro's begin iterator for writing. |
| ChunkList::const_iterator beginWrite() const { |
| return writeChunks_.begin(); |
| } |
| |
| /// Return the chunk avro's end iterator for writing. |
| ChunkList::const_iterator endWrite() const { |
| return writeChunks_.end(); |
| } |
| |
| /// Write a single value to buffer, add a new chunk if necessary. |
| template<typename T> |
| void writeTo(T val, const std::true_type &) { |
| if (freeSpace_ && (sizeof(T) <= writeChunks_.front().freeSize())) { |
| // fast path, there's enough room in the writeable chunk to just |
| // straight out copy it |
| *(reinterpret_cast<T *>(writeChunks_.front().tellWritePos())) = val; |
| postWrite(sizeof(T)); |
| } else { |
| // need to fixup chunks first, so use the regular memcpy |
| // writeTo method |
| writeTo(reinterpret_cast<data_type *>(&val), sizeof(T)); |
| } |
| } |
| |
| /// An uninstantiable function, this is if boost::is_fundamental check fails, |
| /// and will compile-time assert. |
| template<typename T> |
| void writeTo(T /*val*/, const std::false_type &) { |
| BOOST_STATIC_ASSERT(sizeof(T) == 0); |
| } |
| |
| /// Write a block of data to the buffer, adding new chunks if necessary. |
| size_type writeTo(const data_type *data, size_type size) { |
| size_type bytesLeft = size; |
| while (bytesLeft) { |
| |
| if (freeSpace_ == 0) { |
| allocChunkChecked(); |
| } |
| |
| Chunk &chunk = writeChunks_.front(); |
| size_type toCopy = std::min<size_type>(chunk.freeSize(), bytesLeft); |
| assert(toCopy); |
| memcpy(chunk.tellWritePos(), data, toCopy); |
| postWrite(toCopy); |
| data += toCopy; |
| bytesLeft -= toCopy; |
| } |
| return size; |
| } |
| |
| /// Update internal status of chunks after data is written using iterator. |
| size_type wroteTo(size_type size) { |
| assert(size <= freeSpace_); |
| size_type bytesLeft = size; |
| while (bytesLeft) { |
| |
| Chunk &chunk = writeChunks_.front(); |
| size_type wrote = std::min<size_type>(chunk.freeSize(), bytesLeft); |
| assert(wrote); |
| postWrite(wrote); |
| bytesLeft -= wrote; |
| } |
| return size; |
| } |
| |
| /// Append the chunks that have data in src to this buffer |
| void append(const BufferImpl &src) { |
| std::copy(src.readChunks_.begin(), src.readChunks_.end(), std::back_inserter(readChunks_)); |
| size_ += src.size_; |
| } |
| |
| /// Remove all the chunks that contain data from this buffer. |
| void discardData() { |
| readChunks_.clear(); |
| size_ = 0; |
| } |
| |
| /// Remove the specified amount of data from the chunks, starting at the front. |
| void discardData(size_type bytes) { |
| assert(bytes && bytes <= size_); |
| |
| size_type bytesToDiscard = bytes; |
| while (bytesToDiscard) { |
| |
| size_t currentSize = readChunks_.front().dataSize(); |
| |
| // see if entire chunk is discarded |
| if (currentSize <= bytesToDiscard) { |
| readChunks_.pop_front(); |
| bytesToDiscard -= currentSize; |
| } else { |
| readChunks_.front().truncateFront(bytesToDiscard); |
| bytesToDiscard = 0; |
| } |
| } |
| |
| size_ -= bytes; |
| } |
| |
| /// Remove the specified amount of data from the chunks, moving the |
| /// data to dest's chunks |
| void extractData(BufferImpl &dest, size_type bytes) { |
| assert(bytes && bytes <= size_); |
| |
| size_type bytesToExtract = bytes; |
| while (bytesToExtract) { |
| |
| size_t currentSize = readChunks_.front().dataSize(); |
| dest.readChunks_.push_back(readChunks_.front()); |
| |
| // see if entire chunk was extracted |
| if (currentSize <= bytesToExtract) { |
| readChunks_.pop_front(); |
| bytesToExtract -= currentSize; |
| } else { |
| readChunks_.front().truncateFront(bytesToExtract); |
| size_t excess = currentSize - bytesToExtract; |
| dest.readChunks_.back().truncateBack(excess); |
| bytesToExtract = 0; |
| } |
| } |
| |
| size_ -= bytes; |
| dest.size_ += bytes; |
| } |
| |
| /// Move data from this to the destination, leaving this buffer without data |
| void extractData(BufferImpl &dest) { |
| assert(dest.readChunks_.empty()); |
| dest.readChunks_.swap(readChunks_); |
| dest.size_ = size_; |
| size_ = 0; |
| } |
| |
| /// Copy data to a different buffer by copying the chunks. It's |
| /// a bit like extract, but without modifying the source buffer. |
| static void copyData(BufferImpl &dest, |
| ChunkList::const_iterator iter, |
| size_type offset, |
| size_type bytes) { |
| // now we are positioned to start the copying, copy as many |
| // chunks as we need, the first chunk may have a non-zero offset |
| // if the data to copy is not at the start of the chunk |
| size_type copied = 0; |
| while (copied < bytes) { |
| |
| dest.readChunks_.push_back(*iter); |
| |
| // offset only applies in the first chunk, |
| // all subsequent chunks are copied from the start |
| dest.readChunks_.back().truncateFront(offset); |
| offset = 0; |
| |
| copied += dest.readChunks_.back().dataSize(); |
| ++iter; |
| } |
| |
| // if the last chunk copied has more bytes than we need, truncate it |
| size_type excess = copied - bytes; |
| dest.readChunks_.back().truncateBack(excess); |
| |
| dest.size_ += bytes; |
| } |
| |
| /// The number of chunks containing data. Used for debugging. |
| int numDataChunks() const { |
| return readChunks_.size(); |
| } |
| |
| /// The number of chunks containing free space (note that an entire chunk |
| /// may not be free). Used for debugging. |
| int numFreeChunks() const { |
| return writeChunks_.size(); |
| } |
| |
| /// Add unmanaged data to the buffer. The buffer will not automatically |
| /// free the data, but it will call the supplied function when the data is |
| /// no longer referenced by the buffer (or copies of the buffer). |
| void appendForeignData(const data_type *data, size_type size, const free_func &func) { |
| readChunks_.push_back(Chunk(data, size, func)); |
| size_ += size; |
| } |
| BufferImpl &operator=(const BufferImpl &src) = delete; |
| |
| private: |
| ChunkList readChunks_; ///< chunks of this buffer containing data |
| ChunkList writeChunks_; ///< chunks of this buffer containing free space |
| |
| size_type freeSpace_; ///< capacity of buffer before allocation required |
| size_type size_; ///< amount of data in buffer |
| }; |
| |
| } // namespace detail |
| |
| } // namespace avro |
| |
| #endif |