| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| module client_pool_test; |
| |
| import core.sync.semaphore : Semaphore; |
| import core.time : Duration, dur; |
| import core.thread : Thread; |
| import std.algorithm; |
| import std.array; |
| import std.conv; |
| import std.exception; |
| import std.getopt; |
| import std.range; |
| import std.stdio; |
| import std.typecons; |
| import std.variant : Variant; |
| import thrift.base; |
| import thrift.async.libevent; |
| import thrift.async.socket; |
| import thrift.codegen.base; |
| import thrift.codegen.async_client; |
| import thrift.codegen.async_client_pool; |
| import thrift.codegen.client; |
| import thrift.codegen.client_pool; |
| import thrift.codegen.processor; |
| import thrift.protocol.base; |
| import thrift.protocol.binary; |
| import thrift.server.base; |
| import thrift.server.simple; |
| import thrift.server.transport.socket; |
| import thrift.transport.base; |
| import thrift.transport.buffered; |
| import thrift.transport.socket; |
| import thrift.util.cancellation; |
| import thrift.util.future; |
| |
| // We use this as our RPC-layer exception here to make sure socket/… problems |
| // (that would usually considered to be RPC layer faults) cause the tests to |
| // fail, even though we are testing the RPC exception handling. |
| class TestServiceException : TException { |
| int port; |
| } |
| |
| interface TestService { |
| int getPort(); |
| alias .TestServiceException TestServiceException; |
| enum methodMeta = [TMethodMeta("getPort", [], |
| [TExceptionMeta("a", 1, "TestServiceException")])]; |
| } |
| |
| // Use some derived service, just to check that the pools handle inheritance |
| // correctly. |
| interface ExTestService : TestService { |
| int[] getPortInArray(); |
| enum methodMeta = [TMethodMeta("getPortInArray", [], |
| [TExceptionMeta("a", 1, "TestServiceException")])]; |
| } |
| |
| class ExTestHandler : ExTestService { |
| this(ushort port, Duration delay, bool failing, bool trace) { |
| this.port = port; |
| this.delay = delay; |
| this.failing = failing; |
| this.trace = trace; |
| } |
| |
| override int getPort() { |
| if (trace) { |
| stderr.writefln("getPort() called on %s (delay: %s, failing: %s)", port, |
| delay, failing); |
| } |
| sleep(); |
| failIfEnabled(); |
| return port; |
| } |
| |
| override int[] getPortInArray() { |
| return [getPort()]; |
| } |
| |
| ushort port; |
| Duration delay; |
| bool failing; |
| bool trace; |
| |
| private: |
| void sleep() { |
| if (delay > dur!"hnsecs"(0)) Thread.sleep(delay); |
| } |
| |
| void failIfEnabled() { |
| if (!failing) return; |
| |
| auto e = new TestServiceException; |
| e.port = port; |
| throw e; |
| } |
| } |
| |
| class ServerPreServeHandler : TServerEventHandler { |
| this(Semaphore sem) { |
| sem_ = sem; |
| } |
| |
| override void preServe() { |
| sem_.notify(); |
| } |
| |
| Variant createContext(TProtocol input, TProtocol output) { return Variant.init; } |
| void deleteContext(Variant serverContext, TProtocol input, TProtocol output) {} |
| void preProcess(Variant serverContext, TTransport transport) {} |
| |
| private: |
| Semaphore sem_; |
| } |
| |
| class ServerThread : Thread { |
| this(ExTestHandler handler, ServerPreServeHandler serverHandler, TCancellation cancellation) { |
| super(&run); |
| handler_ = handler; |
| cancellation_ = cancellation; |
| serverHandler_ = serverHandler; |
| } |
| private: |
| void run() { |
| try { |
| auto protocolFactory = new TBinaryProtocolFactory!(); |
| auto processor = new TServiceProcessor!ExTestService(handler_); |
| auto serverTransport = new TServerSocket(handler_.port); |
| serverTransport.recvTimeout = dur!"seconds"(3); |
| auto transportFactory = new TBufferedTransportFactory; |
| |
| auto server = new TSimpleServer(processor, serverTransport, transportFactory, protocolFactory); |
| server.eventHandler = serverHandler_; |
| server.serve(cancellation_); |
| } catch (Exception e) { |
| writefln("Server thread on port %s failed: %s", handler_.port, e); |
| } |
| } |
| |
| ExTestHandler handler_; |
| ServerPreServeHandler serverHandler_; |
| TCancellation cancellation_; |
| } |
| |
| void main(string[] args) { |
| bool trace; |
| ushort port = 9090; |
| getopt(args, "port", &port, "trace", &trace); |
| |
| auto serverCancellation = new TCancellationOrigin; |
| scope (exit) serverCancellation.trigger(); |
| |
| immutable ports = cast(immutable)array(map!"cast(ushort)a"(iota(port, port + 6))); |
| |
| // semaphore that will be incremented whenever each server thread has bound and started listening |
| Semaphore sem = new Semaphore(0); |
| |
| version (none) { |
| // Cannot use this due to multiple DMD @@BUG@@s: |
| // 1. »function D main is a nested function and cannot be accessed from array« |
| // when calling array() on the result of the outer map() – would have to |
| // manually do the eager evaluation/array conversion. |
| // 2. »Zip.opSlice cannot get frame pointer to map« for the delay argument, |
| // can be worked around by calling array() on the map result first. |
| // 3. Even when using the workarounds for the last two points, the DMD-built |
| // executable crashes when building without (sic!) inlining enabled, |
| // the backtrace points into the first delegate literal. |
| auto handlers = array(map!((args){ |
| return new ExTestHandler(args._0, args._1, args._2, trace); |
| })(zip( |
| ports, |
| map!((a){ return dur!`msecs`(a); })([1, 10, 100, 1, 10, 100]), |
| [false, false, false, true, true, true] |
| ))); |
| } else { |
| auto handlers = [ |
| new ExTestHandler(cast(ushort)(port + 0), dur!"msecs"(1), false, trace), |
| new ExTestHandler(cast(ushort)(port + 1), dur!"msecs"(10), false, trace), |
| new ExTestHandler(cast(ushort)(port + 2), dur!"msecs"(100), false, trace), |
| new ExTestHandler(cast(ushort)(port + 3), dur!"msecs"(1), true, trace), |
| new ExTestHandler(cast(ushort)(port + 4), dur!"msecs"(10), true, trace), |
| new ExTestHandler(cast(ushort)(port + 5), dur!"msecs"(100), true, trace) |
| ]; |
| } |
| |
| // Fire up the server threads. |
| foreach (h; handlers) (new ServerThread(h, new ServerPreServeHandler(sem), serverCancellation)).start(); |
| |
| // wait until all the handlers signal that they're ready to serve |
| foreach (h; handlers) (sem.wait(dur!`seconds`(1))); |
| |
| syncClientPoolTest(ports, handlers); |
| asyncClientPoolTest(ports, handlers); |
| asyncFastestClientPoolTest(ports, handlers); |
| asyncAggregatorTest(ports, handlers); |
| } |
| |
| |
| void syncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) { |
| auto clients = array(map!((a){ |
| return cast(TClientBase!ExTestService)tClient!ExTestService( |
| tBinaryProtocol(new TSocket("127.0.0.1", a)) |
| ); |
| })(ports)); |
| |
| scope(exit) foreach (c; clients) c.outputProtocol.transport.close(); |
| |
| // Try the case where the first client succeeds. |
| { |
| enforce(makePool(clients).getPort() == ports[0]); |
| } |
| |
| // Try the case where all clients fail. |
| { |
| auto pool = makePool(clients[3 .. $]); |
| auto e = cast(TCompoundOperationException)collectException(pool.getPort()); |
| enforce(e); |
| enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions), |
| ports[3 .. $])); |
| } |
| |
| // Try the case where the first clients fail, but a later one succeeds. |
| { |
| auto pool = makePool(clients[3 .. $] ~ clients[0 .. 3]); |
| enforce(pool.getPortInArray() == [ports[0]]); |
| } |
| |
| // Make sure a client is properly deactivated when it has failed too often. |
| { |
| auto pool = makePool(clients); |
| pool.faultDisableCount = 1; |
| pool.faultDisableDuration = dur!"msecs"(50); |
| |
| handlers[0].failing = true; |
| enforce(pool.getPort() == ports[1]); |
| |
| handlers[0].failing = false; |
| enforce(pool.getPort() == ports[1]); |
| |
| Thread.sleep(dur!"msecs"(50)); |
| enforce(pool.getPort() == ports[0]); |
| } |
| } |
| |
| auto makePool(TClientBase!ExTestService[] clients) { |
| auto p = tClientPool(clients); |
| p.permuteClients = false; |
| p.rpcFaultFilter = (Exception e) { |
| return (cast(TestServiceException)e !is null); |
| }; |
| return p; |
| } |
| |
| |
| void asyncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) { |
| auto manager = new TLibeventAsyncManager; |
| scope (exit) manager.stop(dur!"hnsecs"(0)); |
| |
| auto clients = makeAsyncClients(manager, ports); |
| scope(exit) foreach (c; clients) c.transport.close(); |
| |
| // Try the case where the first client succeeds. |
| { |
| enforce(makeAsyncPool(clients).getPort() == ports[0]); |
| } |
| |
| // Try the case where all clients fail. |
| { |
| auto pool = makeAsyncPool(clients[3 .. $]); |
| auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet()); |
| enforce(e); |
| enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions), |
| ports[3 .. $])); |
| } |
| |
| // Try the case where the first clients fail, but a later one succeeds. |
| { |
| auto pool = makeAsyncPool(clients[3 .. $] ~ clients[0 .. 3]); |
| enforce(pool.getPortInArray() == [ports[0]]); |
| } |
| |
| // Make sure a client is properly deactivated when it has failed too often. |
| { |
| auto pool = makeAsyncPool(clients); |
| pool.faultDisableCount = 1; |
| pool.faultDisableDuration = dur!"msecs"(50); |
| |
| handlers[0].failing = true; |
| enforce(pool.getPort() == ports[1]); |
| |
| handlers[0].failing = false; |
| enforce(pool.getPort() == ports[1]); |
| |
| Thread.sleep(dur!"msecs"(50)); |
| enforce(pool.getPort() == ports[0]); |
| } |
| } |
| |
| auto makeAsyncPool(TAsyncClientBase!ExTestService[] clients) { |
| auto p = tAsyncClientPool(clients); |
| p.permuteClients = false; |
| p.rpcFaultFilter = (Exception e) { |
| return (cast(TestServiceException)e !is null); |
| }; |
| return p; |
| } |
| |
| auto makeAsyncClients(TLibeventAsyncManager manager, in ushort[] ports) { |
| // DMD @@BUG@@ workaround: Using array on the lazyHandlers map result leads |
| // to »function D main is a nested function and cannot be accessed from array«. |
| // Thus, we manually do the array conversion. |
| auto lazyClients = map!((a){ |
| return new TAsyncClient!ExTestService( |
| new TAsyncSocket(manager, "127.0.0.1", a), |
| new TBufferedTransportFactory, |
| new TBinaryProtocolFactory!(TBufferedTransport) |
| ); |
| })(ports); |
| TAsyncClientBase!ExTestService[] clients; |
| foreach (c; lazyClients) clients ~= c; |
| return clients; |
| } |
| |
| |
| void asyncFastestClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) { |
| auto manager = new TLibeventAsyncManager; |
| scope (exit) manager.stop(dur!"hnsecs"(0)); |
| |
| auto clients = makeAsyncClients(manager, ports); |
| scope(exit) foreach (c; clients) c.transport.close(); |
| |
| // Make sure the fastest client wins, even if they are called in some other |
| // order. |
| { |
| auto result = makeAsyncFastestPool(array(retro(clients))).getPort().waitGet(); |
| enforce(result == ports[0]); |
| } |
| |
| // Try the case where all clients fail. |
| { |
| auto pool = makeAsyncFastestPool(clients[3 .. $]); |
| auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet()); |
| enforce(e); |
| enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions), |
| ports[3 .. $])); |
| } |
| |
| // Try the case where the first clients fail, but a later one succeeds. |
| { |
| auto pool = makeAsyncFastestPool(clients[1 .. $]); |
| enforce(pool.getPortInArray() == [ports[1]]); |
| } |
| } |
| |
| auto makeAsyncFastestPool(TAsyncClientBase!ExTestService[] clients) { |
| auto p = tAsyncFastestClientPool(clients); |
| p.rpcFaultFilter = (Exception e) { |
| return (cast(TestServiceException)e !is null); |
| }; |
| return p; |
| } |
| |
| |
| void asyncAggregatorTest(const(ushort)[] ports, ExTestHandler[] handlers) { |
| auto manager = new TLibeventAsyncManager; |
| scope (exit) manager.stop(dur!"hnsecs"(0)); |
| |
| auto clients = makeAsyncClients(manager, ports); |
| scope(exit) foreach (c; clients) c.transport.close(); |
| |
| auto aggregator = tAsyncAggregator( |
| cast(TAsyncClientBase!ExTestService[])clients); |
| |
| // Test aggregator range interface. |
| { |
| auto range = aggregator.getPort().range(dur!"msecs"(50)); |
| enforce(equal(range, ports[0 .. 2][])); |
| enforce(equal(map!"a.port"(cast(TestServiceException[])range.exceptions), |
| ports[3 .. $ - 1])); |
| enforce(range.completedCount == 4); |
| } |
| |
| // Test default accumulator for scalars. |
| { |
| auto fullResult = aggregator.getPort().accumulate(); |
| enforce(fullResult.waitGet() == ports[0 .. 3]); |
| |
| auto partialResult = aggregator.getPort().accumulate(); |
| Thread.sleep(dur!"msecs"(20)); |
| enforce(partialResult.finishGet() == ports[0 .. 2]); |
| |
| } |
| |
| // Test default accumulator for arrays. |
| { |
| auto fullResult = aggregator.getPortInArray().accumulate(); |
| enforce(fullResult.waitGet() == ports[0 .. 3]); |
| |
| auto partialResult = aggregator.getPortInArray().accumulate(); |
| Thread.sleep(dur!"msecs"(20)); |
| enforce(partialResult.finishGet() == ports[0 .. 2]); |
| } |
| |
| // Test custom accumulator. |
| { |
| auto fullResult = aggregator.getPort().accumulate!(function(int[] results){ |
| return reduce!"a + b"(results); |
| })(); |
| enforce(fullResult.waitGet() == ports[0] + ports[1] + ports[2]); |
| |
| auto partialResult = aggregator.getPort().accumulate!( |
| function(int[] results, Exception[] exceptions) { |
| // Return a tuple of the parameters so we can check them outside of |
| // this function (to verify the values, we need access to »ports«, but |
| // due to DMD @@BUG5710@@, we can't use a delegate literal).f |
| return tuple(results, exceptions); |
| } |
| )(); |
| Thread.sleep(dur!"msecs"(20)); |
| auto resultTuple = partialResult.finishGet(); |
| enforce(resultTuple[0] == ports[0 .. 2]); |
| enforce(equal(map!"a.port"(cast(TestServiceException[])resultTuple[1]), |
| ports[3 .. $ - 1])); |
| } |
| } |