| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include <cassert> |
| #include <algorithm> |
| |
| #include <thrift/transport/TBufferTransports.h> |
| |
| using std::string; |
| |
| namespace apache { |
| namespace thrift { |
| namespace transport { |
| |
| uint32_t TBufferedTransport::readSlow(uint8_t* buf, uint32_t len) { |
| auto have = static_cast<uint32_t>(rBound_ - rBase_); |
| |
| // We should only take the slow path if we can't satisfy the read |
| // with the data already in the buffer. |
| assert(have < len); |
| |
| // If we have some data in the buffer, copy it out and return it. |
| // We have to return it without attempting to read more, since we aren't |
| // guaranteed that the underlying transport actually has more data, so |
| // attempting to read from it could block. |
| if (have > 0) { |
| memcpy(buf, rBase_, have); |
| setReadBuffer(rBuf_.get(), 0); |
| return have; |
| } |
| |
| // No data is available in our buffer. |
| // Get more from underlying transport up to buffer size. |
| // Note that this makes a lot of sense if len < rBufSize_ |
| // and almost no sense otherwise. TODO(dreiss): Fix that |
| // case (possibly including some readv hotness). |
| setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_)); |
| |
| // Hand over whatever we have. |
| uint32_t give = (std::min)(len, static_cast<uint32_t>(rBound_ - rBase_)); |
| memcpy(buf, rBase_, give); |
| rBase_ += give; |
| |
| return give; |
| } |
| |
| void TBufferedTransport::writeSlow(const uint8_t* buf, uint32_t len) { |
| auto have_bytes = static_cast<uint32_t>(wBase_ - wBuf_.get()); |
| auto space = static_cast<uint32_t>(wBound_ - wBase_); |
| // We should only take the slow path if we can't accommodate the write |
| // with the free space already in the buffer. |
| assert(wBound_ - wBase_ < static_cast<ptrdiff_t>(len)); |
| |
| // Now here's the tricky question: should we copy data from buf into our |
| // internal buffer and write it from there, or should we just write out |
| // the current internal buffer in one syscall and write out buf in another. |
| // If our currently buffered data plus buf is at least double our buffer |
| // size, we will have to do two syscalls no matter what (except in the |
| // degenerate case when our buffer is empty), so there is no use copying. |
| // Otherwise, there is sort of a sliding scale. If we have N-1 bytes |
| // buffered and need to write 2, it would be crazy to do two syscalls. |
| // On the other hand, if we have 2 bytes buffered and are writing 2N-3, |
| // we can save a syscall in the short term by loading up our buffer, writing |
| // it out, and copying the rest of the bytes into our buffer. Of course, |
| // if we get another 2-byte write, we haven't saved any syscalls at all, |
| // and have just copied nearly 2N bytes for nothing. Finding a perfect |
| // policy would require predicting the size of future writes, so we're just |
| // going to always eschew syscalls if we have less than 2N bytes to write. |
| |
| // The case where we have to do two syscalls. |
| // This case also covers the case where the buffer is empty, |
| // but it is clearer (I think) to think of it as two separate cases. |
| if ((have_bytes + len >= 2 * wBufSize_) || (have_bytes == 0)) { |
| // TODO(dreiss): writev |
| if (have_bytes > 0) { |
| transport_->write(wBuf_.get(), have_bytes); |
| } |
| transport_->write(buf, len); |
| wBase_ = wBuf_.get(); |
| return; |
| } |
| |
| // Fill up our internal buffer for a write. |
| memcpy(wBase_, buf, space); |
| buf += space; |
| len -= space; |
| transport_->write(wBuf_.get(), wBufSize_); |
| |
| // Copy the rest into our buffer. |
| assert(len < wBufSize_); |
| memcpy(wBuf_.get(), buf, len); |
| wBase_ = wBuf_.get() + len; |
| return; |
| } |
| |
| const uint8_t* TBufferedTransport::borrowSlow(uint8_t* buf, uint32_t* len) { |
| (void)buf; |
| (void)len; |
| // Simply return nullptr. We don't know if there is actually data available on |
| // the underlying transport, so calling read() might block. |
| return nullptr; |
| } |
| |
| void TBufferedTransport::flush() { |
| resetConsumedMessageSize(); |
| // Write out any data waiting in the write buffer. |
| auto have_bytes = static_cast<uint32_t>(wBase_ - wBuf_.get()); |
| if (have_bytes > 0) { |
| // Note that we reset wBase_ prior to the underlying write |
| // to ensure we're in a sane state (i.e. internal buffer cleaned) |
| // if the underlying write throws up an exception |
| wBase_ = wBuf_.get(); |
| transport_->write(wBuf_.get(), have_bytes); |
| } |
| |
| // Flush the underlying transport. |
| transport_->flush(); |
| } |
| |
| uint32_t TFramedTransport::readSlow(uint8_t* buf, uint32_t len) { |
| uint32_t want = len; |
| auto have = static_cast<uint32_t>(rBound_ - rBase_); |
| |
| // We should only take the slow path if we can't satisfy the read |
| // with the data already in the buffer. |
| assert(have < want); |
| |
| // If we have some data in the buffer, copy it out and return it. |
| // We have to return it without attempting to read more, since we aren't |
| // guaranteed that the underlying transport actually has more data, so |
| // attempting to read from it could block. |
| if (have > 0) { |
| memcpy(buf, rBase_, have); |
| setReadBuffer(rBuf_.get(), 0); |
| return have; |
| } |
| |
| // Read another frame. |
| if (!readFrame()) { |
| // EOF. No frame available. |
| return 0; |
| } |
| |
| // TODO(dreiss): Should we warn when reads cross frames? |
| |
| // Hand over whatever we have. |
| uint32_t give = (std::min)(want, static_cast<uint32_t>(rBound_ - rBase_)); |
| memcpy(buf, rBase_, give); |
| rBase_ += give; |
| want -= give; |
| |
| return (len - want); |
| } |
| |
| bool TFramedTransport::readFrame() { |
| // TODO(dreiss): Think about using readv here, even though it would |
| // result in (gasp) read-ahead. |
| |
| // Read the size of the next frame. |
| // We can't use readAll(&sz, sizeof(sz)), since that always throws an |
| // exception on EOF. We want to throw an exception only if EOF occurs after |
| // partial size data. |
| int32_t sz = -1; |
| uint32_t size_bytes_read = 0; |
| while (size_bytes_read < sizeof(sz)) { |
| uint8_t* szp = reinterpret_cast<uint8_t*>(&sz) + size_bytes_read; |
| uint32_t bytes_read |
| = transport_->read(szp, static_cast<uint32_t>(sizeof(sz)) - size_bytes_read); |
| if (bytes_read == 0) { |
| if (size_bytes_read == 0) { |
| // EOF before any data was read. |
| return false; |
| } else { |
| // EOF after a partial frame header. Raise an exception. |
| throw TTransportException(TTransportException::END_OF_FILE, |
| "No more data to read after " |
| "partial frame header."); |
| } |
| } |
| size_bytes_read += bytes_read; |
| } |
| |
| sz = ntohl(sz); |
| |
| if (sz < 0) { |
| throw TTransportException("Frame size has negative value"); |
| } |
| |
| // Check for oversized frame |
| if (sz > static_cast<int32_t>(maxFrameSize_)) |
| throw TTransportException(TTransportException::CORRUPTED_DATA, "Received an oversized frame"); |
| |
| // Read the frame payload, and reset markers. |
| if (sz > static_cast<int32_t>(rBufSize_)) { |
| rBuf_.reset(new uint8_t[sz]); |
| rBufSize_ = sz; |
| } |
| transport_->readAll(rBuf_.get(), sz); |
| setReadBuffer(rBuf_.get(), sz); |
| return true; |
| } |
| |
| void TFramedTransport::writeSlow(const uint8_t* buf, uint32_t len) { |
| // Double buffer size until sufficient. |
| auto have = static_cast<uint32_t>(wBase_ - wBuf_.get()); |
| uint32_t new_size = wBufSize_; |
| if (len + have < have /* overflow */ || len + have > 0x7fffffff) { |
| throw TTransportException(TTransportException::BAD_ARGS, |
| "Attempted to write over 2 GB to TFramedTransport."); |
| } |
| while (new_size < len + have) { |
| new_size = new_size > 0 ? new_size * 2 : 1; |
| } |
| |
| // TODO(dreiss): Consider modifying this class to use malloc/free |
| // so we can use realloc here. |
| |
| // Allocate new buffer. |
| auto* new_buf = new uint8_t[new_size]; |
| |
| // Copy the old buffer to the new one. |
| memcpy(new_buf, wBuf_.get(), have); |
| |
| // Now point buf to the new one. |
| wBuf_.reset(new_buf); |
| wBufSize_ = new_size; |
| wBase_ = wBuf_.get() + have; |
| wBound_ = wBuf_.get() + wBufSize_; |
| |
| // Copy the data into the new buffer. |
| memcpy(wBase_, buf, len); |
| wBase_ += len; |
| } |
| |
| void TFramedTransport::flush() { |
| resetConsumedMessageSize(); |
| int32_t sz_hbo, sz_nbo; |
| assert(wBufSize_ > sizeof(sz_nbo)); |
| |
| // Slip the frame size into the start of the buffer. |
| sz_hbo = static_cast<uint32_t>(wBase_ - (wBuf_.get() + sizeof(sz_nbo))); |
| sz_nbo = (int32_t)htonl((uint32_t)(sz_hbo)); |
| memcpy(wBuf_.get(), (uint8_t*)&sz_nbo, sizeof(sz_nbo)); |
| |
| if (sz_hbo > 0) { |
| // Note that we reset wBase_ (with a pad for the frame size) |
| // prior to the underlying write to ensure we're in a sane state |
| // (i.e. internal buffer cleaned) if the underlying write throws |
| // up an exception |
| wBase_ = wBuf_.get() + sizeof(sz_nbo); |
| |
| // Write size and frame body. |
| transport_->write(wBuf_.get(), static_cast<uint32_t>(sizeof(sz_nbo)) + sz_hbo); |
| } |
| |
| // Flush the underlying transport. |
| transport_->flush(); |
| |
| // reclaim write buffer |
| if (wBufSize_ > bufReclaimThresh_) { |
| wBufSize_ = DEFAULT_BUFFER_SIZE; |
| wBuf_.reset(new uint8_t[wBufSize_]); |
| setWriteBuffer(wBuf_.get(), wBufSize_); |
| |
| // reset wBase_ with a pad for the frame size |
| int32_t pad = 0; |
| wBase_ = wBuf_.get() + sizeof(pad); |
| } |
| } |
| |
| uint32_t TFramedTransport::writeEnd() { |
| return static_cast<uint32_t>(wBase_ - wBuf_.get()); |
| } |
| |
| const uint8_t* TFramedTransport::borrowSlow(uint8_t* buf, uint32_t* len) { |
| (void)buf; |
| (void)len; |
| // Don't try to be clever with shifting buffers. |
| // If the fast path failed let the protocol use its slow path. |
| // Besides, who is going to try to borrow across messages? |
| return nullptr; |
| } |
| |
| uint32_t TFramedTransport::readEnd() { |
| // include framing bytes |
| auto bytes_read = static_cast<uint32_t>(rBound_ - rBuf_.get() + sizeof(uint32_t)); |
| |
| if (rBufSize_ > bufReclaimThresh_) { |
| rBufSize_ = 0; |
| rBuf_.reset(); |
| setReadBuffer(rBuf_.get(), rBufSize_); |
| } |
| |
| return bytes_read; |
| } |
| |
| void TMemoryBuffer::computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give) { |
| // Correct rBound_ so we can use the fast path in the future. |
| rBound_ = wBase_; |
| |
| // Decide how much to give. |
| uint32_t give = (std::min)(len, available_read()); |
| |
| *out_start = rBase_; |
| *out_give = give; |
| |
| // Preincrement rBase_ so the caller doesn't have to. |
| rBase_ += give; |
| } |
| |
| uint32_t TMemoryBuffer::readSlow(uint8_t* buf, uint32_t len) { |
| uint8_t* start; |
| uint32_t give; |
| computeRead(len, &start, &give); |
| |
| // Copy into the provided buffer. |
| memcpy(buf, start, give); |
| |
| return give; |
| } |
| |
| uint32_t TMemoryBuffer::readAppendToString(std::string& str, uint32_t len) { |
| // Don't get some stupid assertion failure. |
| if (buffer_ == nullptr) { |
| return 0; |
| } |
| |
| uint8_t* start; |
| uint32_t give; |
| computeRead(len, &start, &give); |
| |
| // Append to the provided string. |
| str.append((char*)start, give); |
| |
| return give; |
| } |
| |
| void TMemoryBuffer::ensureCanWrite(uint32_t len) { |
| // Check available space |
| uint32_t avail = available_write(); |
| if (len <= avail) { |
| return; |
| } |
| |
| if (!owner_) { |
| throw TTransportException("Insufficient space in external MemoryBuffer"); |
| } |
| |
| // Grow the buffer as necessary. |
| const uint32_t current_used = bufferSize_ - avail; |
| const uint32_t required_buffer_size = len + current_used; |
| if (required_buffer_size > maxBufferSize_) { |
| throw TTransportException(TTransportException::BAD_ARGS, |
| "Internal buffer size overflow when requesting a buffer of size " + std::to_string(required_buffer_size)); |
| } |
| |
| // Always grow to the next bigger power of two: |
| const double suggested_buffer_size = std::exp2(std::ceil(std::log2(required_buffer_size))); |
| // Unless the power of two exceeds maxBufferSize_: |
| const uint64_t new_size = static_cast<uint64_t>((std::min)(suggested_buffer_size, static_cast<double>(maxBufferSize_))); |
| |
| // Allocate into a new pointer so we don't bork ours if it fails. |
| auto* new_buffer = static_cast<uint8_t*>(std::realloc(buffer_, static_cast<std::size_t>(new_size))); |
| if (new_buffer == nullptr) { |
| throw std::bad_alloc(); |
| } |
| |
| rBase_ = new_buffer + (rBase_ - buffer_); |
| rBound_ = new_buffer + (rBound_ - buffer_); |
| wBase_ = new_buffer + (wBase_ - buffer_); |
| wBound_ = new_buffer + new_size; |
| // Note: with realloc() we do not need to free the previous buffer: |
| buffer_ = new_buffer; |
| bufferSize_ = static_cast<uint32_t>(new_size); |
| } |
| |
| void TMemoryBuffer::writeSlow(const uint8_t* buf, uint32_t len) { |
| ensureCanWrite(len); |
| |
| // Copy into the buffer and increment wBase_. |
| memcpy(wBase_, buf, len); |
| wBase_ += len; |
| } |
| |
| void TMemoryBuffer::wroteBytes(uint32_t len) { |
| uint32_t avail = available_write(); |
| if (len > avail) { |
| throw TTransportException("Client wrote more bytes than size of buffer."); |
| } |
| wBase_ += len; |
| } |
| |
| const uint8_t* TMemoryBuffer::borrowSlow(uint8_t* buf, uint32_t* len) { |
| (void)buf; |
| rBound_ = wBase_; |
| if (available_read() >= *len) { |
| *len = available_read(); |
| return rBase_; |
| } |
| return nullptr; |
| } |
| } |
| } |
| } // apache::thrift::transport |