blob: b24c97afd763c2a94c0339773c4537c84876c530 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
module client_pool_test;
import core.sync.semaphore : Semaphore;
import core.time : Duration, dur;
import core.thread : Thread;
import std.algorithm;
import std.array;
import std.conv;
import std.exception;
import std.getopt;
import std.range;
import std.stdio;
import std.typecons;
import std.variant : Variant;
import thrift.base;
import thrift.async.libevent;
import thrift.async.socket;
import thrift.codegen.base;
import thrift.codegen.async_client;
import thrift.codegen.async_client_pool;
import thrift.codegen.client;
import thrift.codegen.client_pool;
import thrift.codegen.processor;
import thrift.protocol.base;
import thrift.protocol.binary;
import thrift.server.base;
import thrift.server.simple;
import thrift.server.transport.socket;
import thrift.transport.base;
import thrift.transport.buffered;
import thrift.transport.socket;
import thrift.util.cancellation;
import thrift.util.future;
// We use this as our RPC-layer exception here to make sure socket/… problems
// (that would usually considered to be RPC layer faults) cause the tests to
// fail, even though we are testing the RPC exception handling.
class TestServiceException : TException {
int port;
}
interface TestService {
int getPort();
alias .TestServiceException TestServiceException;
enum methodMeta = [TMethodMeta("getPort", [],
[TExceptionMeta("a", 1, "TestServiceException")])];
}
// Use some derived service, just to check that the pools handle inheritance
// correctly.
interface ExTestService : TestService {
int[] getPortInArray();
enum methodMeta = [TMethodMeta("getPortInArray", [],
[TExceptionMeta("a", 1, "TestServiceException")])];
}
class ExTestHandler : ExTestService {
this(ushort port, Duration delay, bool failing, bool trace) {
this.port = port;
this.delay = delay;
this.failing = failing;
this.trace = trace;
}
override int getPort() {
if (trace) {
stderr.writefln("getPort() called on %s (delay: %s, failing: %s)", port,
delay, failing);
}
sleep();
failIfEnabled();
return port;
}
override int[] getPortInArray() {
return [getPort()];
}
ushort port;
Duration delay;
bool failing;
bool trace;
private:
void sleep() {
if (delay > dur!"hnsecs"(0)) Thread.sleep(delay);
}
void failIfEnabled() {
if (!failing) return;
auto e = new TestServiceException;
e.port = port;
throw e;
}
}
class ServerPreServeHandler : TServerEventHandler {
this(Semaphore sem) {
sem_ = sem;
}
override void preServe() {
sem_.notify();
}
Variant createContext(TProtocol input, TProtocol output) { return Variant.init; }
void deleteContext(Variant serverContext, TProtocol input, TProtocol output) {}
void preProcess(Variant serverContext, TTransport transport) {}
private:
Semaphore sem_;
}
class ServerThread : Thread {
this(ExTestHandler handler, ServerPreServeHandler serverHandler, TCancellation cancellation) {
super(&run);
handler_ = handler;
cancellation_ = cancellation;
serverHandler_ = serverHandler;
}
private:
void run() {
try {
auto protocolFactory = new TBinaryProtocolFactory!();
auto processor = new TServiceProcessor!ExTestService(handler_);
auto serverTransport = new TServerSocket(handler_.port);
serverTransport.recvTimeout = dur!"seconds"(3);
auto transportFactory = new TBufferedTransportFactory;
auto server = new TSimpleServer(processor, serverTransport, transportFactory, protocolFactory);
server.eventHandler = serverHandler_;
server.serve(cancellation_);
} catch (Exception e) {
writefln("Server thread on port %s failed: %s", handler_.port, e);
}
}
ExTestHandler handler_;
ServerPreServeHandler serverHandler_;
TCancellation cancellation_;
}
void main(string[] args) {
bool trace;
ushort port = 9090;
getopt(args, "port", &port, "trace", &trace);
auto serverCancellation = new TCancellationOrigin;
scope (exit) serverCancellation.trigger();
immutable ports = cast(immutable)array(map!"cast(ushort)a"(iota(port, port + 6)));
// semaphore that will be incremented whenever each server thread has bound and started listening
Semaphore sem = new Semaphore(0);
version (none) {
// Cannot use this due to multiple DMD @@BUG@@s:
// 1. »function D main is a nested function and cannot be accessed from array«
// when calling array() on the result of the outer map() – would have to
// manually do the eager evaluation/array conversion.
// 2. »Zip.opSlice cannot get frame pointer to map« for the delay argument,
// can be worked around by calling array() on the map result first.
// 3. Even when using the workarounds for the last two points, the DMD-built
// executable crashes when building without (sic!) inlining enabled,
// the backtrace points into the first delegate literal.
auto handlers = array(map!((args){
return new ExTestHandler(args._0, args._1, args._2, trace);
})(zip(
ports,
map!((a){ return dur!`msecs`(a); })([1, 10, 100, 1, 10, 100]),
[false, false, false, true, true, true]
)));
} else {
auto handlers = [
new ExTestHandler(cast(ushort)(port + 0), dur!"msecs"(1), false, trace),
new ExTestHandler(cast(ushort)(port + 1), dur!"msecs"(10), false, trace),
new ExTestHandler(cast(ushort)(port + 2), dur!"msecs"(100), false, trace),
new ExTestHandler(cast(ushort)(port + 3), dur!"msecs"(1), true, trace),
new ExTestHandler(cast(ushort)(port + 4), dur!"msecs"(10), true, trace),
new ExTestHandler(cast(ushort)(port + 5), dur!"msecs"(100), true, trace)
];
}
// Fire up the server threads.
foreach (h; handlers) (new ServerThread(h, new ServerPreServeHandler(sem), serverCancellation)).start();
// wait until all the handlers signal that they're ready to serve
foreach (h; handlers) (sem.wait(dur!`seconds`(1)));
syncClientPoolTest(ports, handlers);
asyncClientPoolTest(ports, handlers);
asyncFastestClientPoolTest(ports, handlers);
asyncAggregatorTest(ports, handlers);
}
void syncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
auto clients = array(map!((a){
return cast(TClientBase!ExTestService)tClient!ExTestService(
tBinaryProtocol(new TSocket("127.0.0.1", a))
);
})(ports));
scope(exit) foreach (c; clients) c.outputProtocol.transport.close();
// Try the case where the first client succeeds.
{
enforce(makePool(clients).getPort() == ports[0]);
}
// Try the case where all clients fail.
{
auto pool = makePool(clients[3 .. $]);
auto e = cast(TCompoundOperationException)collectException(pool.getPort());
enforce(e);
enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
ports[3 .. $]));
}
// Try the case where the first clients fail, but a later one succeeds.
{
auto pool = makePool(clients[3 .. $] ~ clients[0 .. 3]);
enforce(pool.getPortInArray() == [ports[0]]);
}
// Make sure a client is properly deactivated when it has failed too often.
{
auto pool = makePool(clients);
pool.faultDisableCount = 1;
pool.faultDisableDuration = dur!"msecs"(50);
handlers[0].failing = true;
enforce(pool.getPort() == ports[1]);
handlers[0].failing = false;
enforce(pool.getPort() == ports[1]);
Thread.sleep(dur!"msecs"(50));
enforce(pool.getPort() == ports[0]);
}
}
auto makePool(TClientBase!ExTestService[] clients) {
auto p = tClientPool(clients);
p.permuteClients = false;
p.rpcFaultFilter = (Exception e) {
return (cast(TestServiceException)e !is null);
};
return p;
}
void asyncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
auto manager = new TLibeventAsyncManager;
scope (exit) manager.stop(dur!"hnsecs"(0));
auto clients = makeAsyncClients(manager, ports);
scope(exit) foreach (c; clients) c.transport.close();
// Try the case where the first client succeeds.
{
enforce(makeAsyncPool(clients).getPort() == ports[0]);
}
// Try the case where all clients fail.
{
auto pool = makeAsyncPool(clients[3 .. $]);
auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet());
enforce(e);
enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
ports[3 .. $]));
}
// Try the case where the first clients fail, but a later one succeeds.
{
auto pool = makeAsyncPool(clients[3 .. $] ~ clients[0 .. 3]);
enforce(pool.getPortInArray() == [ports[0]]);
}
// Make sure a client is properly deactivated when it has failed too often.
{
auto pool = makeAsyncPool(clients);
pool.faultDisableCount = 1;
pool.faultDisableDuration = dur!"msecs"(50);
handlers[0].failing = true;
enforce(pool.getPort() == ports[1]);
handlers[0].failing = false;
enforce(pool.getPort() == ports[1]);
Thread.sleep(dur!"msecs"(50));
enforce(pool.getPort() == ports[0]);
}
}
auto makeAsyncPool(TAsyncClientBase!ExTestService[] clients) {
auto p = tAsyncClientPool(clients);
p.permuteClients = false;
p.rpcFaultFilter = (Exception e) {
return (cast(TestServiceException)e !is null);
};
return p;
}
auto makeAsyncClients(TLibeventAsyncManager manager, in ushort[] ports) {
// DMD @@BUG@@ workaround: Using array on the lazyHandlers map result leads
// to »function D main is a nested function and cannot be accessed from array«.
// Thus, we manually do the array conversion.
auto lazyClients = map!((a){
return new TAsyncClient!ExTestService(
new TAsyncSocket(manager, "127.0.0.1", a),
new TBufferedTransportFactory,
new TBinaryProtocolFactory!(TBufferedTransport)
);
})(ports);
TAsyncClientBase!ExTestService[] clients;
foreach (c; lazyClients) clients ~= c;
return clients;
}
void asyncFastestClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
auto manager = new TLibeventAsyncManager;
scope (exit) manager.stop(dur!"hnsecs"(0));
auto clients = makeAsyncClients(manager, ports);
scope(exit) foreach (c; clients) c.transport.close();
// Make sure the fastest client wins, even if they are called in some other
// order.
{
auto result = makeAsyncFastestPool(array(retro(clients))).getPort().waitGet();
enforce(result == ports[0]);
}
// Try the case where all clients fail.
{
auto pool = makeAsyncFastestPool(clients[3 .. $]);
auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet());
enforce(e);
enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
ports[3 .. $]));
}
// Try the case where the first clients fail, but a later one succeeds.
{
auto pool = makeAsyncFastestPool(clients[1 .. $]);
enforce(pool.getPortInArray() == [ports[1]]);
}
}
auto makeAsyncFastestPool(TAsyncClientBase!ExTestService[] clients) {
auto p = tAsyncFastestClientPool(clients);
p.rpcFaultFilter = (Exception e) {
return (cast(TestServiceException)e !is null);
};
return p;
}
void asyncAggregatorTest(const(ushort)[] ports, ExTestHandler[] handlers) {
auto manager = new TLibeventAsyncManager;
scope (exit) manager.stop(dur!"hnsecs"(0));
auto clients = makeAsyncClients(manager, ports);
scope(exit) foreach (c; clients) c.transport.close();
auto aggregator = tAsyncAggregator(
cast(TAsyncClientBase!ExTestService[])clients);
// Test aggregator range interface.
{
auto range = aggregator.getPort().range(dur!"msecs"(50));
enforce(equal(range, ports[0 .. 2][]));
enforce(equal(map!"a.port"(cast(TestServiceException[])range.exceptions),
ports[3 .. $ - 1]));
enforce(range.completedCount == 4);
}
// Test default accumulator for scalars.
{
auto fullResult = aggregator.getPort().accumulate();
enforce(fullResult.waitGet() == ports[0 .. 3]);
auto partialResult = aggregator.getPort().accumulate();
Thread.sleep(dur!"msecs"(20));
enforce(partialResult.finishGet() == ports[0 .. 2]);
}
// Test default accumulator for arrays.
{
auto fullResult = aggregator.getPortInArray().accumulate();
enforce(fullResult.waitGet() == ports[0 .. 3]);
auto partialResult = aggregator.getPortInArray().accumulate();
Thread.sleep(dur!"msecs"(20));
enforce(partialResult.finishGet() == ports[0 .. 2]);
}
// Test custom accumulator.
{
auto fullResult = aggregator.getPort().accumulate!(function(int[] results){
return reduce!"a + b"(results);
})();
enforce(fullResult.waitGet() == ports[0] + ports[1] + ports[2]);
auto partialResult = aggregator.getPort().accumulate!(
function(int[] results, Exception[] exceptions) {
// Return a tuple of the parameters so we can check them outside of
// this function (to verify the values, we need access to »ports«, but
// due to DMD @@BUG5710@@, we can't use a delegate literal).f
return tuple(results, exceptions);
}
)();
Thread.sleep(dur!"msecs"(20));
auto resultTuple = partialResult.finishGet();
enforce(resultTuple[0] == ports[0 .. 2]);
enforce(equal(map!"a.port"(cast(TestServiceException[])resultTuple[1]),
ports[3 .. $ - 1]));
}
}