blob: 94effbbaf8ec97afa7c36cd0ab52085067543b38 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
module thrift.transport.framed;
import core.bitop : bswap;
import std.algorithm : min;
import std.array : empty;
import std.exception : enforce;
import thrift.transport.base;
/**
* Framed transport.
*
* All writes go into an in-memory buffer until flush is called, at which point
* the transport writes the length of the entire binary chunk followed by the
* data payload. The receiver on the other end then performs a single
* »fixed-length« read to get the whole message off the wire.
*/
final class TFramedTransport : TBaseTransport {
/**
* Constructs a new framed transport.
*
* Params:
* transport = The underlying transport to wrap.
*/
this(TTransport transport) {
transport_ = transport;
}
/**
* Returns the wrapped transport.
*/
TTransport underlyingTransport() @property {
return transport_;
}
override bool isOpen() @property {
return transport_.isOpen;
}
override bool peek() {
return rBuf_.length > 0 || transport_.peek();
}
override void open() {
transport_.open();
}
override void close() {
flush();
transport_.close();
}
/**
* Attempts to read data into the given buffer, stopping when the buffer is
* exhausted or the frame end is reached.
*
* TODO: Contrary to the C++ implementation, this never does cross-frame
* reads – is there actually a valid use case for that?
*
* Params:
* buf = Slice to use as buffer.
*
* Returns: How many bytes were actually read.
*
* Throws: TTransportException if an error occurs.
*/
override size_t read(ubyte[] buf) {
// If the buffer is empty, read a new frame off the wire.
if (rBuf_.empty) {
bool gotFrame = readFrame();
if (!gotFrame) return 0;
}
auto size = min(rBuf_.length, buf.length);
buf[0..size] = rBuf_[0..size];
rBuf_ = rBuf_[size..$];
return size;
}
override void write(in ubyte[] buf) {
wBuf_ ~= buf;
}
override void flush() {
if (wBuf_.empty) return;
// Properly reset the write buffer even some of the protocol operations go
// wrong.
scope (exit) {
wBuf_.length = 0;
wBuf_.assumeSafeAppend();
}
int len = bswap(cast(int)wBuf_.length);
transport_.write(cast(ubyte[])(&len)[0..1]);
transport_.write(wBuf_);
transport_.flush();
}
override const(ubyte)[] borrow(ubyte* buf, size_t len) {
if (len <= rBuf_.length) {
return rBuf_;
} else {
// Don't try attempting cross-frame borrows, trying that does not make
// much sense anyway.
return null;
}
}
override void consume(size_t len) {
enforce(len <= rBuf_.length, new TTransportException(
"Invalid consume length", TTransportException.Type.BAD_ARGS));
rBuf_ = rBuf_[len .. $];
}
private:
bool readFrame() {
// Read the size of the next frame. We can't use readAll() since that
// always throws an exception on EOF, but want to throw an exception only
// if EOF occurs after partial size data.
int size;
size_t size_read;
while (size_read < size.sizeof) {
auto data = (cast(ubyte*)&size)[size_read..size.sizeof];
auto read = transport_.read(data);
if (read == 0) {
if (size_read == 0) {
// EOF before any data was read.
return false;
} else {
// EOF after a partial frame header – illegal.
throw new TTransportException(
"No more data to read after partial frame header",
TTransportException.Type.END_OF_FILE
);
}
}
size_read += read;
}
size = bswap(size);
enforce(size >= 0, new TTransportException("Frame size has negative value",
TTransportException.Type.CORRUPTED_DATA));
// TODO: Benchmark this.
rBuf_.length = size;
rBuf_.assumeSafeAppend();
transport_.readAll(rBuf_);
return true;
}
TTransport transport_;
ubyte[] rBuf_;
ubyte[] wBuf_;
}
/**
* Wraps given transports into TFramedTransports.
*/
alias TWrapperTransportFactory!TFramedTransport TFramedTransportFactory;
version (unittest) {
import std.random : Mt19937, uniform;
import thrift.transport.memory;
}
// Some basic random testing, always starting with the same seed for
// deterministic unit test results – more tests in transport_test.
unittest {
auto randGen = Mt19937(42);
// 32 kiB of data to work with.
auto data = new ubyte[1 << 15];
foreach (ref b; data) {
b = uniform!"[]"(cast(ubyte)0, cast(ubyte)255, randGen);
}
// Generate a list of chunk sizes to split the data into. A uniform
// distribution is not quite realistic, but std.random doesn't have anything
// else yet.
enum MAX_FRAME_LENGTH = 512;
auto chunkSizesList = new size_t[][2];
foreach (ref chunkSizes; chunkSizesList) {
size_t sum;
while (true) {
auto curLen = uniform(0, MAX_FRAME_LENGTH, randGen);
sum += curLen;
if (sum > data.length) break;
chunkSizes ~= curLen;
}
}
chunkSizesList ~= [data.length]; // Also test whole chunk at once.
// Test writing data.
{
foreach (chunkSizes; chunkSizesList) {
auto buf = new TMemoryBuffer;
auto framed = new TFramedTransport(buf);
auto remainingData = data;
foreach (chunkSize; chunkSizes) {
framed.write(remainingData[0..chunkSize]);
remainingData = remainingData[chunkSize..$];
}
framed.flush();
auto writtenData = data[0..($ - remainingData.length)];
auto actualData = buf.getContents();
// Check frame size.
int frameSize = bswap((cast(int[])(actualData[0..int.sizeof]))[0]);
enforce(frameSize == writtenData.length);
// Check actual data.
enforce(actualData[int.sizeof..$] == writtenData);
}
}
// Test reading data.
{
foreach (chunkSizes; chunkSizesList) {
auto buf = new TMemoryBuffer;
auto size = bswap(cast(int)data.length);
buf.write(cast(ubyte[])(&size)[0..1]);
buf.write(data);
auto framed = new TFramedTransport(buf);
ubyte[] readData;
readData.reserve(data.length);
foreach (chunkSize; chunkSizes) {
// This should work with read because we have one huge frame.
auto oldReadLen = readData.length;
readData.length += chunkSize;
framed.read(readData[oldReadLen..$]);
}
enforce(readData == data[0..readData.length]);
}
}
// Test combined reading/writing of multiple frames.
foreach (flushProbability; [1, 2, 4, 8, 16, 32]) {
foreach (chunkSizes; chunkSizesList) {
auto buf = new TMemoryBuffer;
auto framed = new TFramedTransport(buf);
size_t[] frameSizes;
// Write the data.
size_t frameSize;
auto remainingData = data;
foreach (chunkSize; chunkSizes) {
framed.write(remainingData[0..chunkSize]);
remainingData = remainingData[chunkSize..$];
frameSize += chunkSize;
if (frameSize > 0 && uniform(0, flushProbability, randGen) == 0) {
frameSizes ~= frameSize;
frameSize = 0;
framed.flush();
}
}
if (frameSize > 0) {
frameSizes ~= frameSize;
frameSize = 0;
framed.flush();
}
// Read it back.
auto readData = new ubyte[data.length - remainingData.length];
auto remainToRead = readData;
foreach (fSize; frameSizes) {
// We are exploiting an implementation detail of TFramedTransport:
// The read buffer starts empty and it will never return more than one
// frame per read, so by just requesting all of the data, we should
// always get exactly one frame.
auto got = framed.read(remainToRead);
enforce(got == fSize);
remainToRead = remainToRead[fSize..$];
}
enforce(remainToRead.empty);
enforce(readData == data[0..readData.length]);
}
}
}
// Test flush()ing an empty buffer.
unittest {
auto buf = new TMemoryBuffer();
auto framed = new TFramedTransport(buf);
immutable out1 = [0, 0, 0, 1, 'a'];
immutable out2 = [0, 0, 0, 1, 'a', 0, 0, 0, 2, 'b', 'c'];
framed.flush();
enforce(buf.getContents() == []);
framed.flush();
framed.flush();
enforce(buf.getContents() == []);
framed.write(cast(ubyte[])"a");
enforce(buf.getContents() == []);
framed.flush();
enforce(buf.getContents() == out1);
framed.flush();
framed.flush();
enforce(buf.getContents() == out1);
framed.write(cast(ubyte[])"bc");
enforce(buf.getContents() == out1);
framed.flush();
enforce(buf.getContents() == out2);
framed.flush();
framed.flush();
enforce(buf.getContents() == out2);
}