| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| module thrift.transport.buffered; |
| |
| import std.algorithm : min; |
| import std.array : empty; |
| import std.exception : enforce; |
| import thrift.transport.base; |
| |
| /** |
| * Wraps another transport and buffers reads and writes until the internal |
| * buffers are exhausted, at which point new data is fetched resp. the |
| * accumulated data is written out at once. |
| */ |
| final class TBufferedTransport : TBaseTransport { |
| /** |
| * Constructs a new instance, using the default buffer sizes. |
| * |
| * Params: |
| * transport = The underlying transport to wrap. |
| */ |
| this(TTransport transport) { |
| this(transport, DEFAULT_BUFFER_SIZE); |
| } |
| |
| /** |
| * Constructs a new instance, using the specified buffer size. |
| * |
| * Params: |
| * transport = The underlying transport to wrap. |
| * bufferSize = The size of the read and write buffers to use, in bytes. |
| */ |
| this(TTransport transport, size_t bufferSize) { |
| this(transport, bufferSize, bufferSize); |
| } |
| |
| /** |
| * Constructs a new instance, using the specified buffer size. |
| * |
| * Params: |
| * transport = The underlying transport to wrap. |
| * readBufferSize = The size of the read buffer to use, in bytes. |
| * writeBufferSize = The size of the write buffer to use, in bytes. |
| */ |
| this(TTransport transport, size_t readBufferSize, size_t writeBufferSize) { |
| transport_ = transport; |
| readBuffer_ = new ubyte[readBufferSize]; |
| writeBuffer_ = new ubyte[writeBufferSize]; |
| writeAvail_ = writeBuffer_; |
| } |
| |
| /// The default size of the read/write buffers, in bytes. |
| enum int DEFAULT_BUFFER_SIZE = 512; |
| |
| override bool isOpen() @property { |
| return transport_.isOpen(); |
| } |
| |
| override bool peek() { |
| if (readAvail_.empty) { |
| // If there is nothing available to read, see if we can get something |
| // from the underlying transport. |
| auto bytesRead = transport_.read(readBuffer_); |
| readAvail_ = readBuffer_[0 .. bytesRead]; |
| } |
| |
| return !readAvail_.empty; |
| } |
| |
| override void open() { |
| transport_.open(); |
| } |
| |
| override void close() { |
| if (!isOpen) return; |
| flush(); |
| transport_.close(); |
| } |
| |
| override size_t read(ubyte[] buf) { |
| if (readAvail_.empty) { |
| // No data left in our buffer, fetch some from the underlying transport. |
| |
| if (buf.length > readBuffer_.length) { |
| // If the amount of data requested is larger than our reading buffer, |
| // directly read to the passed buffer. This probably doesn't occur too |
| // often in practice (and even if it does, the underlying transport |
| // probably cannot fulfill the request at once anyway), but it can't |
| // harm to try… |
| return transport_.read(buf); |
| } |
| |
| auto bytesRead = transport_.read(readBuffer_); |
| readAvail_ = readBuffer_[0 .. bytesRead]; |
| } |
| |
| // Hand over whatever we have. |
| auto give = min(readAvail_.length, buf.length); |
| buf[0 .. give] = readAvail_[0 .. give]; |
| readAvail_ = readAvail_[give .. $]; |
| return give; |
| } |
| |
| /** |
| * Shortcut version of readAll. |
| */ |
| override void readAll(ubyte[] buf) { |
| if (readAvail_.length >= buf.length) { |
| buf[] = readAvail_[0 .. buf.length]; |
| readAvail_ = readAvail_[buf.length .. $]; |
| return; |
| } |
| |
| super.readAll(buf); |
| } |
| |
| override void write(in ubyte[] buf) { |
| if (writeAvail_.length >= buf.length) { |
| // If the data fits in the buffer, just save it there. |
| writeAvail_[0 .. buf.length] = buf; |
| writeAvail_ = writeAvail_[buf.length .. $]; |
| return; |
| } |
| |
| // We have to decide if we copy data from buf to our internal buffer, or |
| // just directly write them out. The same considerations about avoiding |
| // syscalls as for C++ apply here. |
| auto bytesAvail = writeAvail_.ptr - writeBuffer_.ptr; |
| if ((bytesAvail + buf.length >= 2 * writeBuffer_.length) || (bytesAvail == 0)) { |
| // We would immediately need two syscalls anyway (or we don't have |
| // anything) in our buffer to write, so just write out both buffers. |
| if (bytesAvail > 0) { |
| transport_.write(writeBuffer_[0 .. bytesAvail]); |
| writeAvail_ = writeBuffer_; |
| } |
| |
| transport_.write(buf); |
| return; |
| } |
| |
| // Fill up our internal buffer for a write. |
| writeAvail_[] = buf[0 .. writeAvail_.length]; |
| auto left = buf[writeAvail_.length .. $]; |
| transport_.write(writeBuffer_); |
| |
| // Copy the rest into our buffer. |
| writeBuffer_[0 .. left.length] = left[]; |
| writeAvail_ = writeBuffer_[left.length .. $]; |
| } |
| |
| override void flush() { |
| // Write out any data waiting in the write buffer. |
| auto bytesAvail = writeAvail_.ptr - writeBuffer_.ptr; |
| if (bytesAvail > 0) { |
| // Note that we reset writeAvail_ prior to calling the underlying protocol |
| // to make sure the buffer is cleared even if the transport throws an |
| // exception. |
| writeAvail_ = writeBuffer_; |
| transport_.write(writeBuffer_[0 .. bytesAvail]); |
| } |
| |
| // Flush the underlying transport. |
| transport_.flush(); |
| } |
| |
| override const(ubyte)[] borrow(ubyte* buf, size_t len) { |
| if (len <= readAvail_.length) { |
| return readAvail_; |
| } |
| return null; |
| } |
| |
| override void consume(size_t len) { |
| enforce(len <= readBuffer_.length, new TTransportException( |
| "Invalid consume length.", TTransportException.Type.BAD_ARGS)); |
| readAvail_ = readAvail_[len .. $]; |
| } |
| |
| /** |
| * The wrapped transport. |
| */ |
| TTransport underlyingTransport() @property { |
| return transport_; |
| } |
| |
| private: |
| TTransport transport_; |
| |
| ubyte[] readBuffer_; |
| ubyte[] writeBuffer_; |
| |
| ubyte[] readAvail_; |
| ubyte[] writeAvail_; |
| } |
| |
| /** |
| * Wraps given transports into TBufferedTransports. |
| */ |
| alias TWrapperTransportFactory!TBufferedTransport TBufferedTransportFactory; |