| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ |
| #define _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ 1 |
| |
| #include <cstring> |
| #include "boost/scoped_array.hpp" |
| |
| #include <transport/TTransport.h> |
| |
| #ifdef __GNUC__ |
| #define TDB_LIKELY(val) (__builtin_expect((val), 1)) |
| #define TDB_UNLIKELY(val) (__builtin_expect((val), 0)) |
| #else |
| #define TDB_LIKELY(val) (val) |
| #define TDB_UNLIKELY(val) (val) |
| #endif |
| |
| namespace apache { namespace thrift { namespace transport { |
| |
| |
| /** |
| * Base class for all transports that use read/write buffers for performance. |
| * |
| * TBufferBase is designed to implement the fast-path "memcpy" style |
| * operations that work in the common case. It does so with small and |
| * (eventually) nonvirtual, inlinable methods. TBufferBase is an abstract |
| * class. Subclasses are expected to define the "slow path" operations |
| * that have to be done when the buffers are full or empty. |
| * |
| */ |
| class TBufferBase : public TTransport { |
| |
| public: |
| |
| /** |
| * Fast-path read. |
| * |
| * When we have enough data buffered to fulfill the read, we can satisfy it |
| * with a single memcpy, then adjust our internal pointers. If the buffer |
| * is empty, we call out to our slow path, implemented by a subclass. |
| * This method is meant to eventually be nonvirtual and inlinable. |
| */ |
| uint32_t read(uint8_t* buf, uint32_t len) { |
| uint8_t* new_rBase = rBase_ + len; |
| if (TDB_LIKELY(new_rBase <= rBound_)) { |
| std::memcpy(buf, rBase_, len); |
| rBase_ = new_rBase; |
| return len; |
| } |
| return readSlow(buf, len); |
| } |
| |
| /** |
| * Fast-path write. |
| * |
| * When we have enough empty space in our buffer to accomodate the write, we |
| * can satisfy it with a single memcpy, then adjust our internal pointers. |
| * If the buffer is full, we call out to our slow path, implemented by a |
| * subclass. This method is meant to eventually be nonvirtual and |
| * inlinable. |
| */ |
| void write(const uint8_t* buf, uint32_t len) { |
| uint8_t* new_wBase = wBase_ + len; |
| if (TDB_LIKELY(new_wBase <= wBound_)) { |
| std::memcpy(wBase_, buf, len); |
| wBase_ = new_wBase; |
| return; |
| } |
| writeSlow(buf, len); |
| } |
| |
| /** |
| * Fast-path borrow. A lot like the fast-path read. |
| */ |
| const uint8_t* borrow(uint8_t* buf, uint32_t* len) { |
| if (TDB_LIKELY(static_cast<ptrdiff_t>(*len) <= rBound_ - rBase_)) { |
| // With strict aliasing, writing to len shouldn't force us to |
| // refetch rBase_ from memory. TODO(dreiss): Verify this. |
| *len = rBound_ - rBase_; |
| return rBase_; |
| } |
| return borrowSlow(buf, len); |
| } |
| |
| /** |
| * Consume doesn't require a slow path. |
| */ |
| void consume(uint32_t len) { |
| if (TDB_LIKELY(static_cast<ptrdiff_t>(len) <= rBound_ - rBase_)) { |
| rBase_ += len; |
| } else { |
| throw TTransportException(TTransportException::BAD_ARGS, |
| "consume did not follow a borrow."); |
| } |
| } |
| |
| |
| protected: |
| |
| /// Slow path read. |
| virtual uint32_t readSlow(uint8_t* buf, uint32_t len) = 0; |
| |
| /// Slow path write. |
| virtual void writeSlow(const uint8_t* buf, uint32_t len) = 0; |
| |
| /** |
| * Slow path borrow. |
| * |
| * POSTCONDITION: return == NULL || rBound_ - rBase_ >= *len |
| */ |
| virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) = 0; |
| |
| /** |
| * Trivial constructor. |
| * |
| * Initialize pointers safely. Constructing is not a very |
| * performance-sensitive operation, so it is okay to just leave it to |
| * the concrete class to set up pointers correctly. |
| */ |
| TBufferBase() |
| : rBase_(NULL) |
| , rBound_(NULL) |
| , wBase_(NULL) |
| , wBound_(NULL) |
| {} |
| |
| /// Convenience mutator for setting the read buffer. |
| void setReadBuffer(uint8_t* buf, uint32_t len) { |
| rBase_ = buf; |
| rBound_ = buf+len; |
| } |
| |
| /// Convenience mutator for setting the write buffer. |
| void setWriteBuffer(uint8_t* buf, uint32_t len) { |
| wBase_ = buf; |
| wBound_ = buf+len; |
| } |
| |
| virtual ~TBufferBase() {} |
| |
| /// Reads begin here. |
| uint8_t* rBase_; |
| /// Reads may extend to just before here. |
| uint8_t* rBound_; |
| |
| /// Writes begin here. |
| uint8_t* wBase_; |
| /// Writes may extend to just before here. |
| uint8_t* wBound_; |
| }; |
| |
| |
| /** |
| * Base class for all transport which wraps transport to new one. |
| */ |
| class TUnderlyingTransport : public TBufferBase { |
| public: |
| static const int DEFAULT_BUFFER_SIZE = 512; |
| |
| virtual bool peek() { |
| return (rBase_ < rBound_) || transport_->peek(); |
| } |
| |
| void open() { |
| transport_->open(); |
| } |
| |
| bool isOpen() { |
| return transport_->isOpen(); |
| } |
| |
| void close() { |
| flush(); |
| transport_->close(); |
| } |
| |
| boost::shared_ptr<TTransport> getUnderlyingTransport() { |
| return transport_; |
| } |
| |
| protected: |
| boost::shared_ptr<TTransport> transport_; |
| |
| uint32_t rBufSize_; |
| uint32_t wBufSize_; |
| boost::scoped_array<uint8_t> rBuf_; |
| boost::scoped_array<uint8_t> wBuf_; |
| |
| TUnderlyingTransport(boost::shared_ptr<TTransport> transport, uint32_t sz) |
| : transport_(transport) |
| , rBufSize_(sz) |
| , wBufSize_(sz) |
| , rBuf_(new uint8_t[rBufSize_]) |
| , wBuf_(new uint8_t[wBufSize_]) {} |
| |
| TUnderlyingTransport(boost::shared_ptr<TTransport> transport) |
| : transport_(transport) |
| , rBufSize_(DEFAULT_BUFFER_SIZE) |
| , wBufSize_(DEFAULT_BUFFER_SIZE) |
| , rBuf_(new uint8_t[rBufSize_]) |
| , wBuf_(new uint8_t[wBufSize_]) {} |
| |
| TUnderlyingTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz) |
| : transport_(transport) |
| , rBufSize_(rsz) |
| , wBufSize_(wsz) |
| , rBuf_(new uint8_t[rBufSize_]) |
| , wBuf_(new uint8_t[wBufSize_]) {} |
| }; |
| |
| /** |
| * Buffered transport. For reads it will read more data than is requested |
| * and will serve future data out of a local buffer. For writes, data is |
| * stored to an in memory buffer before being written out. |
| * |
| */ |
| class TBufferedTransport : public TUnderlyingTransport { |
| public: |
| |
| /// Use default buffer sizes. |
| TBufferedTransport(boost::shared_ptr<TTransport> transport) |
| : TUnderlyingTransport(transport) |
| { |
| initPointers(); |
| } |
| |
| /// Use specified buffer sizes. |
| TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz) |
| : TUnderlyingTransport(transport, sz) |
| { |
| initPointers(); |
| } |
| |
| /// Use specified read and write buffer sizes. |
| TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz) |
| : TUnderlyingTransport(transport, rsz, wsz) |
| { |
| initPointers(); |
| } |
| |
| virtual bool peek() { |
| /* shigin: see THRIFT-96 discussion */ |
| if (rBase_ == rBound_) { |
| setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_)); |
| } |
| return (rBound_ > rBase_); |
| } |
| virtual uint32_t readSlow(uint8_t* buf, uint32_t len); |
| |
| virtual void writeSlow(const uint8_t* buf, uint32_t len); |
| |
| void flush(); |
| |
| |
| /** |
| * The following behavior is currently implemented by TBufferedTransport, |
| * but that may change in a future version: |
| * 1/ If len is at most rBufSize_, borrow will never return NULL. |
| * Depending on the underlying transport, it could throw an exception |
| * or hang forever. |
| * 2/ Some borrow requests may copy bytes internally. However, |
| * if len is at most rBufSize_/2, none of the copied bytes |
| * will ever have to be copied again. For optimial performance, |
| * stay under this limit. |
| */ |
| virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len); |
| |
| protected: |
| void initPointers() { |
| setReadBuffer(rBuf_.get(), 0); |
| setWriteBuffer(wBuf_.get(), wBufSize_); |
| // Write size never changes. |
| } |
| }; |
| |
| |
| /** |
| * Wraps a transport into a buffered one. |
| * |
| */ |
| class TBufferedTransportFactory : public TTransportFactory { |
| public: |
| TBufferedTransportFactory() {} |
| |
| virtual ~TBufferedTransportFactory() {} |
| |
| /** |
| * Wraps the transport into a buffered one. |
| */ |
| virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) { |
| return boost::shared_ptr<TTransport>(new TBufferedTransport(trans)); |
| } |
| |
| }; |
| |
| |
| /** |
| * Framed transport. All writes go into an in-memory buffer until flush is |
| * called, at which point the transport writes the length of the entire |
| * binary chunk followed by the data payload. This allows the receiver on the |
| * other end to always do fixed-length reads. |
| * |
| */ |
| class TFramedTransport : public TUnderlyingTransport { |
| public: |
| |
| /// Use default buffer sizes. |
| TFramedTransport(boost::shared_ptr<TTransport> transport) |
| : TUnderlyingTransport(transport) |
| { |
| initPointers(); |
| } |
| |
| TFramedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz) |
| : TUnderlyingTransport(transport, sz) |
| { |
| initPointers(); |
| } |
| |
| virtual uint32_t readSlow(uint8_t* buf, uint32_t len); |
| |
| virtual void writeSlow(const uint8_t* buf, uint32_t len); |
| |
| virtual void flush(); |
| |
| const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len); |
| |
| protected: |
| /** |
| * Reads a frame of input from the underlying stream. |
| */ |
| void readFrame(); |
| |
| void initPointers() { |
| setReadBuffer(NULL, 0); |
| setWriteBuffer(wBuf_.get(), wBufSize_); |
| |
| // Pad the buffer so we can insert the size later. |
| int32_t pad = 0; |
| this->write((uint8_t*)&pad, sizeof(pad)); |
| } |
| }; |
| |
| /** |
| * Wraps a transport into a framed one. |
| * |
| */ |
| class TFramedTransportFactory : public TTransportFactory { |
| public: |
| TFramedTransportFactory() {} |
| |
| virtual ~TFramedTransportFactory() {} |
| |
| /** |
| * Wraps the transport into a framed one. |
| */ |
| virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) { |
| return boost::shared_ptr<TTransport>(new TFramedTransport(trans)); |
| } |
| |
| }; |
| |
| |
| /** |
| * A memory buffer is a tranpsort that simply reads from and writes to an |
| * in memory buffer. Anytime you call write on it, the data is simply placed |
| * into a buffer, and anytime you call read, data is read from that buffer. |
| * |
| * The buffers are allocated using C constructs malloc,realloc, and the size |
| * doubles as necessary. We've considered using scoped |
| * |
| */ |
| class TMemoryBuffer : public TBufferBase { |
| private: |
| |
| // Common initialization done by all constructors. |
| void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) { |
| if (buf == NULL && size != 0) { |
| assert(owner); |
| buf = (uint8_t*)std::malloc(size); |
| if (buf == NULL) { |
| throw TTransportException("Out of memory"); |
| } |
| } |
| |
| buffer_ = buf; |
| bufferSize_ = size; |
| |
| rBase_ = buffer_; |
| rBound_ = buffer_ + wPos; |
| // TODO(dreiss): Investigate NULL-ing this if !owner. |
| wBase_ = buffer_ + wPos; |
| wBound_ = buffer_ + bufferSize_; |
| |
| owner_ = owner; |
| |
| // rBound_ is really an artifact. In principle, it should always be |
| // equal to wBase_. We update it in a few places (computeRead, etc.). |
| } |
| |
| public: |
| static const uint32_t defaultSize = 1024; |
| |
| /** |
| * This enum specifies how a TMemoryBuffer should treat |
| * memory passed to it via constructors or resetBuffer. |
| * |
| * OBSERVE: |
| * TMemoryBuffer will simply store a pointer to the memory. |
| * It is the callers responsibility to ensure that the pointer |
| * remains valid for the lifetime of the TMemoryBuffer, |
| * and that it is properly cleaned up. |
| * Note that no data can be written to observed buffers. |
| * |
| * COPY: |
| * TMemoryBuffer will make an internal copy of the buffer. |
| * The caller has no responsibilities. |
| * |
| * TAKE_OWNERSHIP: |
| * TMemoryBuffer will become the "owner" of the buffer, |
| * and will be responsible for freeing it. |
| * The membory must have been allocated with malloc. |
| */ |
| enum MemoryPolicy |
| { OBSERVE = 1 |
| , COPY = 2 |
| , TAKE_OWNERSHIP = 3 |
| }; |
| |
| /** |
| * Construct a TMemoryBuffer with a default-sized buffer, |
| * owned by the TMemoryBuffer object. |
| */ |
| TMemoryBuffer() { |
| initCommon(NULL, defaultSize, true, 0); |
| } |
| |
| /** |
| * Construct a TMemoryBuffer with a buffer of a specified size, |
| * owned by the TMemoryBuffer object. |
| * |
| * @param sz The initial size of the buffer. |
| */ |
| TMemoryBuffer(uint32_t sz) { |
| initCommon(NULL, sz, true, 0); |
| } |
| |
| /** |
| * Construct a TMemoryBuffer with buf as its initial contents. |
| * |
| * @param buf The initial contents of the buffer. |
| * Note that, while buf is a non-const pointer, |
| * TMemoryBuffer will not write to it if policy == OBSERVE, |
| * so it is safe to const_cast<uint8_t*>(whatever). |
| * @param sz The size of @c buf. |
| * @param policy See @link MemoryPolicy @endlink . |
| */ |
| TMemoryBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) { |
| if (buf == NULL && sz != 0) { |
| throw TTransportException(TTransportException::BAD_ARGS, |
| "TMemoryBuffer given null buffer with non-zero size."); |
| } |
| |
| switch (policy) { |
| case OBSERVE: |
| case TAKE_OWNERSHIP: |
| initCommon(buf, sz, policy == TAKE_OWNERSHIP, sz); |
| break; |
| case COPY: |
| initCommon(NULL, sz, true, 0); |
| this->write(buf, sz); |
| break; |
| default: |
| throw TTransportException(TTransportException::BAD_ARGS, |
| "Invalid MemoryPolicy for TMemoryBuffer"); |
| } |
| } |
| |
| ~TMemoryBuffer() { |
| if (owner_) { |
| std::free(buffer_); |
| } |
| } |
| |
| bool isOpen() { |
| return true; |
| } |
| |
| bool peek() { |
| return (rBase_ < wBase_); |
| } |
| |
| void open() {} |
| |
| void close() {} |
| |
| // TODO(dreiss): Make bufPtr const. |
| void getBuffer(uint8_t** bufPtr, uint32_t* sz) { |
| *bufPtr = rBase_; |
| *sz = wBase_ - rBase_; |
| } |
| |
| std::string getBufferAsString() { |
| if (buffer_ == NULL) { |
| return ""; |
| } |
| uint8_t* buf; |
| uint32_t sz; |
| getBuffer(&buf, &sz); |
| return std::string((char*)buf, (std::string::size_type)sz); |
| } |
| |
| void appendBufferToString(std::string& str) { |
| if (buffer_ == NULL) { |
| return; |
| } |
| uint8_t* buf; |
| uint32_t sz; |
| getBuffer(&buf, &sz); |
| str.append((char*)buf, sz); |
| } |
| |
| void resetBuffer(bool reset_capacity = false) { |
| if (reset_capacity) |
| { |
| assert(owner_); |
| |
| void* new_buffer = std::realloc(buffer_, defaultSize); |
| |
| if (new_buffer == NULL) { |
| throw TTransportException("Out of memory."); |
| } |
| |
| buffer_ = (uint8_t*) new_buffer; |
| bufferSize_ = defaultSize; |
| |
| wBound_ = buffer_ + bufferSize_; |
| } |
| |
| rBase_ = buffer_; |
| rBound_ = buffer_; |
| wBase_ = buffer_; |
| // It isn't safe to write into a buffer we don't own. |
| if (!owner_) { |
| wBound_ = wBase_; |
| bufferSize_ = 0; |
| } |
| } |
| |
| /// See constructor documentation. |
| void resetBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) { |
| // Use a variant of the copy-and-swap trick for assignment operators. |
| // This is sub-optimal in terms of performance for two reasons: |
| // 1/ The constructing and swapping of the (small) values |
| // in the temporary object takes some time, and is not necessary. |
| // 2/ If policy == COPY, we allocate the new buffer before |
| // freeing the old one, precluding the possibility of |
| // reusing that memory. |
| // I doubt that either of these problems could be optimized away, |
| // but the second is probably no a common case, and the first is minor. |
| // I don't expect resetBuffer to be a common operation, so I'm willing to |
| // bite the performance bullet to make the method this simple. |
| |
| // Construct the new buffer. |
| TMemoryBuffer new_buffer(buf, sz, policy); |
| // Move it into ourself. |
| this->swap(new_buffer); |
| // Our old self gets destroyed. |
| } |
| |
| std::string readAsString(uint32_t len) { |
| std::string str; |
| (void)readAppendToString(str, len); |
| return str; |
| } |
| |
| uint32_t readAppendToString(std::string& str, uint32_t len); |
| |
| void readEnd() { |
| if (rBase_ == wBase_) { |
| resetBuffer(); |
| } |
| } |
| |
| uint32_t available_read() const { |
| // Remember, wBase_ is the real rBound_. |
| return wBase_ - rBase_; |
| } |
| |
| uint32_t available_write() const { |
| return wBound_ - wBase_; |
| } |
| |
| // Returns a pointer to where the client can write data to append to |
| // the TMemoryBuffer, and ensures the buffer is big enough to accomodate a |
| // write of the provided length. The returned pointer is very convenient for |
| // passing to read(), recv(), or similar. You must call wroteBytes() as soon |
| // as data is written or the buffer will not be aware that data has changed. |
| uint8_t* getWritePtr(uint32_t len) { |
| ensureCanWrite(len); |
| return wBase_; |
| } |
| |
| // Informs the buffer that the client has written 'len' bytes into storage |
| // that had been provided by getWritePtr(). |
| void wroteBytes(uint32_t len); |
| |
| protected: |
| void swap(TMemoryBuffer& that) { |
| using std::swap; |
| swap(buffer_, that.buffer_); |
| swap(bufferSize_, that.bufferSize_); |
| |
| swap(rBase_, that.rBase_); |
| swap(rBound_, that.rBound_); |
| swap(wBase_, that.wBase_); |
| swap(wBound_, that.wBound_); |
| |
| swap(owner_, that.owner_); |
| } |
| |
| // Make sure there's at least 'len' bytes available for writing. |
| void ensureCanWrite(uint32_t len); |
| |
| // Compute the position and available data for reading. |
| void computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give); |
| |
| uint32_t readSlow(uint8_t* buf, uint32_t len); |
| |
| void writeSlow(const uint8_t* buf, uint32_t len); |
| |
| const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len); |
| |
| // Data buffer |
| uint8_t* buffer_; |
| |
| // Allocated buffer size |
| uint32_t bufferSize_; |
| |
| // Is this object the owner of the buffer? |
| bool owner_; |
| |
| // Don't forget to update constrctors, initCommon, and swap if |
| // you add new members. |
| }; |
| |
| }}} // apache::thrift::transport |
| |
| #endif // #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ |