blob: 9fe143278738ef79761502462a6d57546b77e572 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
module thrift.transport.piped;
import thrift.transport.base;
import thrift.transport.memory;
/**
* Pipes data request from one transport to another when readEnd()
* or writeEnd() is called.
*
* A typical use case would be to log requests on e.g. a socket to
* disk (i. e. pipe them to a TFileWriterTransport).
*
* The implementation keeps an internal buffer which expands to
* hold the whole amount of data read/written until the corresponding *End()
* method is called.
*
* Contrary to the C++ implementation, this doesn't introduce yet another layer
* of input/output buffering, all calls are passed to the underlying source
* transport verbatim.
*/
final class TPipedTransport(Source = TTransport) if (
isTTransport!Source
) : TBaseTransport {
/// The default initial buffer size if not explicitly specified, in bytes.
enum DEFAULT_INITIAL_BUFFER_SIZE = 512;
/**
* Constructs a new instance.
*
* By default, only reads are piped (pipeReads = true, pipeWrites = false).
*
* Params:
* srcTrans = The transport to which all requests are forwarded.
* dstTrans = The transport the read/written data is copied to.
* initialBufferSize = The default size of the read/write buffers, for
* performance tuning.
*/
this(Source srcTrans, TTransport dstTrans,
size_t initialBufferSize = DEFAULT_INITIAL_BUFFER_SIZE
) {
srcTrans_ = srcTrans;
dstTrans_ = dstTrans;
readBuffer_ = new TMemoryBuffer(initialBufferSize);
writeBuffer_ = new TMemoryBuffer(initialBufferSize);
pipeReads_ = true;
pipeWrites_ = false;
}
bool pipeReads() @property const {
return pipeReads_;
}
void pipeReads(bool value) @property {
if (!value) {
readBuffer_.reset();
}
pipeReads_ = value;
}
bool pipeWrites() @property const {
return pipeWrites_;
}
void pipeWrites(bool value) @property {
if (!value) {
writeBuffer_.reset();
}
pipeWrites_ = value;
}
override bool isOpen() {
return srcTrans_.isOpen();
}
override bool peek() {
return srcTrans_.peek();
}
override void open() {
srcTrans_.open();
}
override void close() {
srcTrans_.close();
}
override size_t read(ubyte[] buf) {
auto bytesRead = srcTrans_.read(buf);
if (pipeReads_) {
readBuffer_.write(buf[0 .. bytesRead]);
}
return bytesRead;
}
override size_t readEnd() {
if (pipeReads_) {
auto data = readBuffer_.getContents();
dstTrans_.write(data);
dstTrans_.flush();
readBuffer_.reset();
srcTrans_.readEnd();
// Return data.length instead of the readEnd() result of the source
// transports because it might not be available from it.
return data.length;
}
return srcTrans_.readEnd();
}
override void write(in ubyte[] buf) {
if (pipeWrites_) {
writeBuffer_.write(buf);
}
srcTrans_.write(buf);
}
override size_t writeEnd() {
if (pipeWrites_) {
auto data = writeBuffer_.getContents();
dstTrans_.write(data);
dstTrans_.flush();
writeBuffer_.reset();
srcTrans_.writeEnd();
// Return data.length instead of the readEnd() result of the source
// transports because it might not be available from it.
return data.length;
}
return srcTrans_.writeEnd();
}
override void flush() {
srcTrans_.flush();
}
private:
Source srcTrans_;
TTransport dstTrans_;
TMemoryBuffer readBuffer_;
TMemoryBuffer writeBuffer_;
bool pipeReads_;
bool pipeWrites_;
}
/**
* TPipedTransport construction helper to avoid having to explicitly
* specify the transport types, i.e. to allow the constructor being called
* using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)).
*/
TPipedTransport!Source tPipedTransport(Source)(
Source srcTrans, TTransport dstTrans
) if (isTTransport!Source) {
return new typeof(return)(srcTrans, dstTrans);
}
version (unittest) {
// DMD @@BUG@@: UFCS for std.array.empty doesn't work when import is moved
// into unittest block.
import std.array;
import std.exception : enforce;
}
unittest {
auto underlying = new TMemoryBuffer;
auto pipeTarget = new TMemoryBuffer;
auto trans = tPipedTransport(underlying, pipeTarget);
underlying.write(cast(ubyte[])"abcd");
ubyte[4] buffer;
trans.readAll(buffer[0 .. 2]);
enforce(buffer[0 .. 2] == "ab");
enforce(pipeTarget.getContents().empty);
trans.readEnd();
enforce(pipeTarget.getContents() == "ab");
pipeTarget.reset();
underlying.write(cast(ubyte[])"ef");
trans.readAll(buffer[0 .. 2]);
enforce(buffer[0 .. 2] == "cd");
enforce(pipeTarget.getContents().empty);
trans.readAll(buffer[0 .. 2]);
enforce(buffer[0 .. 2] == "ef");
enforce(pipeTarget.getContents().empty);
trans.readEnd();
enforce(pipeTarget.getContents() == "cdef");
}