blob: 7a7e5e928a1cb2172e6a2b0db3d3c2954e7f8ef6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <cassert>
#include <algorithm>
#include <transport/TBufferTransports.h>
using std::string;
namespace apache { namespace thrift { namespace transport {
uint32_t TBufferedTransport::readSlow(uint8_t* buf, uint32_t len) {
uint32_t want = len;
uint32_t have = rBound_ - rBase_;
// We should only take the slow path if we can't satisfy the read
// with the data already in the buffer.
assert(have < want);
// Copy out whatever we have.
if (have > 0) {
memcpy(buf, rBase_, have);
want -= have;
buf += have;
}
// Get more from underlying transport up to buffer size.
// Note that this makes a lot of sense if len < rBufSize_
// and almost no sense otherwise. TODO(dreiss): Fix that
// case (possibly including some readv hotness).
setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_));
// Hand over whatever we have.
uint32_t give = std::min(want, static_cast<uint32_t>(rBound_ - rBase_));
memcpy(buf, rBase_, give);
rBase_ += give;
want -= give;
return (len - want);
}
void TBufferedTransport::writeSlow(const uint8_t* buf, uint32_t len) {
uint32_t have_bytes = wBase_ - wBuf_.get();
uint32_t space = wBound_ - wBase_;
// We should only take the slow path if we can't accomodate the write
// with the free space already in the buffer.
assert(wBound_ - wBase_ < static_cast<ptrdiff_t>(len));
// Now here's the tricky question: should we copy data from buf into our
// internal buffer and write it from there, or should we just write out
// the current internal buffer in one syscall and write out buf in another.
// If our currently buffered data plus buf is at least double our buffer
// size, we will have to do two syscalls no matter what (except in the
// degenerate case when our buffer is empty), so there is no use copying.
// Otherwise, there is sort of a sliding scale. If we have N-1 bytes
// buffered and need to write 2, it would be crazy to do two syscalls.
// On the other hand, if we have 2 bytes buffered and are writing 2N-3,
// we can save a syscall in the short term by loading up our buffer, writing
// it out, and copying the rest of the bytes into our buffer. Of course,
// if we get another 2-byte write, we haven't saved any syscalls at all,
// and have just copied nearly 2N bytes for nothing. Finding a perfect
// policy would require predicting the size of future writes, so we're just
// going to always eschew syscalls if we have less than 2N bytes to write.
// The case where we have to do two syscalls.
// This case also covers the case where the buffer is empty,
// but it is clearer (I think) to think of it as two separate cases.
if ((have_bytes + len >= 2*wBufSize_) || (have_bytes == 0)) {
// TODO(dreiss): writev
if (have_bytes > 0) {
transport_->write(wBuf_.get(), have_bytes);
}
transport_->write(buf, len);
wBase_ = wBuf_.get();
return;
}
// Fill up our internal buffer for a write.
memcpy(wBase_, buf, space);
buf += space;
len -= space;
transport_->write(wBuf_.get(), wBufSize_);
// Copy the rest into our buffer.
assert(len < wBufSize_);
memcpy(wBuf_.get(), buf, len);
wBase_ = wBuf_.get() + len;
return;
}
const uint8_t* TBufferedTransport::borrowSlow(uint8_t* buf, uint32_t* len) {
// If the request is bigger than our buffer, we are hosed.
if (*len > rBufSize_) {
return NULL;
}
// The number of bytes of data we have already.
uint32_t have = rBound_ - rBase_;
// The number of additional bytes we need from the underlying transport.
int32_t need = *len - have;
// The space from the start of the buffer to the end of our data.
uint32_t offset = rBound_ - rBuf_.get();
assert(need > 0);
// If we have less than half our buffer space available, shift the data
// we have down to the start. If the borrow is big compared to our buffer,
// this could be kind of a waste, but if the borrow is small, it frees up
// space at the end of our buffer to do a bigger single read from the
// underlying transport. Also, if our needs extend past the end of the
// buffer, we have to do a copy no matter what.
if ((offset > rBufSize_/2) || (offset + need > rBufSize_)) {
memmove(rBuf_.get(), rBase_, have);
setReadBuffer(rBuf_.get(), have);
}
// First try to fill up the buffer.
uint32_t got = transport_->read(rBound_, rBufSize_ - have);
rBound_ += got;
need -= got;
// If that fails, readAll until we get what we need.
if (need > 0) {
rBound_ += transport_->readAll(rBound_, need);
}
*len = rBound_ - rBase_;
return rBase_;
}
void TBufferedTransport::flush() {
// Write out any data waiting in the write buffer.
uint32_t have_bytes = wBase_ - wBuf_.get();
if (have_bytes > 0) {
// Note that we reset wBase_ prior to the underlying write
// to ensure we're in a sane state (i.e. internal buffer cleaned)
// if the underlying write throws up an exception
wBase_ = wBuf_.get();
transport_->write(wBuf_.get(), have_bytes);
}
// Flush the underlying transport.
transport_->flush();
}
uint32_t TFramedTransport::readSlow(uint8_t* buf, uint32_t len) {
uint32_t want = len;
uint32_t have = rBound_ - rBase_;
// We should only take the slow path if we can't satisfy the read
// with the data already in the buffer.
assert(have < want);
// Copy out whatever we have.
if (have > 0) {
memcpy(buf, rBase_, have);
want -= have;
buf += have;
}
// Read another frame.
readFrame();
// TODO(dreiss): Should we warn when reads cross frames?
// Hand over whatever we have.
uint32_t give = std::min(want, static_cast<uint32_t>(rBound_ - rBase_));
memcpy(buf, rBase_, give);
rBase_ += give;
want -= give;
return (len - want);
}
void TFramedTransport::readFrame() {
// TODO(dreiss): Think about using readv here, even though it would
// result in (gasp) read-ahead.
// Read the size of the next frame.
int32_t sz;
transport_->readAll((uint8_t*)&sz, sizeof(sz));
sz = ntohl(sz);
if (sz < 0) {
throw TTransportException("Frame size has negative value");
}
// Read the frame payload, and reset markers.
if (sz > static_cast<int32_t>(rBufSize_)) {
rBuf_.reset(new uint8_t[sz]);
rBufSize_ = sz;
}
transport_->readAll(rBuf_.get(), sz);
setReadBuffer(rBuf_.get(), sz);
}
void TFramedTransport::writeSlow(const uint8_t* buf, uint32_t len) {
// Double buffer size until sufficient.
uint32_t have = wBase_ - wBuf_.get();
while (wBufSize_ < len + have) {
wBufSize_ *= 2;
}
// TODO(dreiss): Consider modifying this class to use malloc/free
// so we can use realloc here.
// Allocate new buffer.
uint8_t* new_buf = new uint8_t[wBufSize_];
// Copy the old buffer to the new one.
memcpy(new_buf, wBuf_.get(), have);
// Now point buf to the new one.
wBuf_.reset(new_buf);
wBase_ = wBuf_.get() + have;
wBound_ = wBuf_.get() + wBufSize_;
// Copy the data into the new buffer.
memcpy(wBase_, buf, len);
wBase_ += len;
}
void TFramedTransport::flush() {
int32_t sz_hbo, sz_nbo;
assert(wBufSize_ > sizeof(sz_nbo));
// Slip the frame size into the start of the buffer.
sz_hbo = wBase_ - (wBuf_.get() + sizeof(sz_nbo));
sz_nbo = (int32_t)htonl((uint32_t)(sz_hbo));
memcpy(wBuf_.get(), (uint8_t*)&sz_nbo, sizeof(sz_nbo));
if (sz_hbo > 0) {
// Note that we reset wBase_ (with a pad for the frame size)
// prior to the underlying write to ensure we're in a sane state
// (i.e. internal buffer cleaned) if the underlying write throws
// up an exception
wBase_ = wBuf_.get() + sizeof(sz_nbo);
// Write size and frame body.
transport_->write(wBuf_.get(), sizeof(sz_nbo)+sz_hbo);
}
// Flush the underlying transport.
transport_->flush();
}
const uint8_t* TFramedTransport::borrowSlow(uint8_t* buf, uint32_t* len) {
// Don't try to be clever with shifting buffers.
// If the fast path failed let the protocol use its slow path.
// Besides, who is going to try to borrow across messages?
return NULL;
}
void TMemoryBuffer::computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give) {
// Correct rBound_ so we can use the fast path in the future.
rBound_ = wBase_;
// Decide how much to give.
uint32_t give = std::min(len, available_read());
*out_start = rBase_;
*out_give = give;
// Preincrement rBase_ so the caller doesn't have to.
rBase_ += give;
}
uint32_t TMemoryBuffer::readSlow(uint8_t* buf, uint32_t len) {
uint8_t* start;
uint32_t give;
computeRead(len, &start, &give);
// Copy into the provided buffer.
memcpy(buf, start, give);
return give;
}
uint32_t TMemoryBuffer::readAppendToString(std::string& str, uint32_t len) {
// Don't get some stupid assertion failure.
if (buffer_ == NULL) {
return 0;
}
uint8_t* start;
uint32_t give;
computeRead(len, &start, &give);
// Append to the provided string.
str.append((char*)start, give);
return give;
}
void TMemoryBuffer::ensureCanWrite(uint32_t len) {
// Check available space
uint32_t avail = available_write();
if (len <= avail) {
return;
}
if (!owner_) {
throw TTransportException("Insufficient space in external MemoryBuffer");
}
// Grow the buffer as necessary.
while (len > avail) {
bufferSize_ *= 2;
wBound_ = buffer_ + bufferSize_;
avail = available_write();
}
// Allocate into a new pointer so we don't bork ours if it fails.
void* new_buffer = std::realloc(buffer_, bufferSize_);
if (new_buffer == NULL) {
throw TTransportException("Out of memory.");
}
ptrdiff_t offset = (uint8_t*)new_buffer - buffer_;
buffer_ += offset;
rBase_ += offset;
rBound_ += offset;
wBase_ += offset;
wBound_ += offset;
}
void TMemoryBuffer::writeSlow(const uint8_t* buf, uint32_t len) {
ensureCanWrite(len);
// Copy into the buffer and increment wBase_.
memcpy(wBase_, buf, len);
wBase_ += len;
}
void TMemoryBuffer::wroteBytes(uint32_t len) {
uint32_t avail = available_write();
if (len > avail) {
throw TTransportException("Client wrote more bytes than size of buffer.");
}
wBase_ += len;
}
const uint8_t* TMemoryBuffer::borrowSlow(uint8_t* buf, uint32_t* len) {
rBound_ = wBase_;
if (available_read() >= *len) {
*len = available_read();
return rBase_;
}
return NULL;
}
}}} // apache::thrift::transport