| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| /** |
| * Utilities for asynchronously querying multiple servers, building on |
| * TAsyncClient. |
| * |
| * Terminology note: The names of the artifacts defined in this module are |
| * derived from »client pool«, because they operate on a pool of |
| * TAsyncClients. However, from a architectural point of view, they often |
| * represent a pool of hosts a Thrift client application communicates with |
| * using RPC calls. |
| */ |
| module thrift.codegen.async_client_pool; |
| |
| import core.sync.mutex; |
| import core.time : Duration, dur; |
| import std.algorithm : map; |
| import std.array : array, empty; |
| import std.exception : enforce; |
| import std.traits : ParameterTypeTuple, ReturnType; |
| import thrift.base; |
| import thrift.codegen.base; |
| import thrift.codegen.async_client; |
| import thrift.internal.algorithm; |
| import thrift.internal.codegen; |
| import thrift.util.awaitable; |
| import thrift.util.cancellation; |
| import thrift.util.future; |
| import thrift.internal.resource_pool; |
| |
| /** |
| * Represents a generic client pool which implements TFutureInterface!Interface |
| * using multiple TAsyncClients. |
| */ |
| interface TAsyncClientPoolBase(Interface) if (isService!Interface) : |
| TFutureInterface!Interface |
| { |
| /// Shorthand for the client type this pool operates on. |
| alias TAsyncClientBase!Interface Client; |
| |
| /** |
| * Adds a client to the pool. |
| */ |
| void addClient(Client client); |
| |
| /** |
| * Removes a client from the pool. |
| * |
| * Returns: Whether the client was found in the pool. |
| */ |
| bool removeClient(Client client); |
| |
| /** |
| * Called to determine whether an exception comes from a client from the |
| * pool not working properly, or if it an exception thrown at the |
| * application level. |
| * |
| * If the delegate returns true, the server/connection is considered to be |
| * at fault, if it returns false, the exception is just passed on to the |
| * caller. |
| * |
| * By default, returns true for instances of TTransportException and |
| * TApplicationException, false otherwise. |
| */ |
| bool delegate(Exception) rpcFaultFilter() const @property; |
| void rpcFaultFilter(bool delegate(Exception)) @property; /// Ditto |
| |
| /** |
| * Whether to open the underlying transports of a client before trying to |
| * execute a method if they are not open. This is usually desirable |
| * because it allows e.g. to automatically reconnect to a remote server |
| * if the network connection is dropped. |
| * |
| * Defaults to true. |
| */ |
| bool reopenTransports() const @property; |
| void reopenTransports(bool) @property; /// Ditto |
| } |
| |
| immutable bool delegate(Exception) defaultRpcFaultFilter; |
| static this() { |
| defaultRpcFaultFilter = (Exception e) { |
| import thrift.protocol.base; |
| import thrift.transport.base; |
| return ( |
| (cast(TTransportException)e !is null) || |
| (cast(TApplicationException)e !is null) |
| ); |
| }; |
| } |
| |
| /** |
| * A TAsyncClientPoolBase implementation which queries multiple servers in a |
| * row until a request succeeds, the result of which is then returned. |
| * |
| * The definition of »success« can be customized using the rpcFaultFilter() |
| * delegate property. If it is non-null and calling it for an exception set by |
| * a failed method invocation returns true, the error is considered to be |
| * caused by the RPC layer rather than the application layer, and the next |
| * server in the pool is tried. If there are no more clients to try, the |
| * operation is marked as failed with a TCompoundOperationException. |
| * |
| * If a TAsyncClient in the pool fails with an RPC exception for a number of |
| * consecutive tries, it is temporarily disabled (not tried any longer) for |
| * a certain duration. Both the limit and the timeout can be configured. If all |
| * clients fail (and keepTrying is false), the operation fails with a |
| * TCompoundOperationException which contains the collected RPC exceptions. |
| */ |
| final class TAsyncClientPool(Interface) if (isService!Interface) : |
| TAsyncClientPoolBase!Interface |
| { |
| /// |
| this(Client[] clients) { |
| pool_ = new TResourcePool!Client(clients); |
| rpcFaultFilter_ = defaultRpcFaultFilter; |
| reopenTransports_ = true; |
| } |
| |
| /+override+/ void addClient(Client client) { |
| pool_.add(client); |
| } |
| |
| /+override+/ bool removeClient(Client client) { |
| return pool_.remove(client); |
| } |
| |
| /** |
| * Whether to keep trying to find a working client if all have failed in a |
| * row. |
| * |
| * Defaults to false. |
| */ |
| bool keepTrying() const @property { |
| return pool_.cycle; |
| } |
| |
| /// Ditto |
| void keepTrying(bool value) @property { |
| pool_.cycle = value; |
| } |
| |
| /** |
| * Whether to use a random permutation of the client pool on every call to |
| * execute(). This can be used e.g. as a simple form of load balancing. |
| * |
| * Defaults to true. |
| */ |
| bool permuteClients() const @property { |
| return pool_.permute; |
| } |
| |
| /// Ditto |
| void permuteClients(bool value) @property { |
| pool_.permute = value; |
| } |
| |
| /** |
| * The number of consecutive faults after which a client is disabled until |
| * faultDisableDuration has passed. 0 to never disable clients. |
| * |
| * Defaults to 0. |
| */ |
| ushort faultDisableCount() const @property { |
| return pool_.faultDisableCount; |
| } |
| |
| /// Ditto |
| void faultDisableCount(ushort value) @property { |
| pool_.faultDisableCount = value; |
| } |
| |
| /** |
| * The duration for which a client is no longer considered after it has |
| * failed too often. |
| * |
| * Defaults to one second. |
| */ |
| Duration faultDisableDuration() const @property { |
| return pool_.faultDisableDuration; |
| } |
| |
| /// Ditto |
| void faultDisableDuration(Duration value) @property { |
| pool_.faultDisableDuration = value; |
| } |
| |
| /+override+/ bool delegate(Exception) rpcFaultFilter() const @property { |
| return rpcFaultFilter_; |
| } |
| |
| /+override+/ void rpcFaultFilter(bool delegate(Exception) value) @property { |
| rpcFaultFilter_ = value; |
| } |
| |
| /+override+/ bool reopenTransports() const @property { |
| return reopenTransports_; |
| } |
| |
| /+override+/ void reopenTransports(bool value) @property { |
| reopenTransports_ = value; |
| } |
| |
| mixin(fallbackPoolForwardCode!Interface()); |
| |
| protected: |
| // The actual worker implementation to which RPC method calls are forwarded. |
| auto executeOnPool(string method, Args...)(Args args, |
| TCancellation cancellation |
| ) { |
| auto clients = pool_[]; |
| if (clients.empty) { |
| throw new TException("No clients available to try."); |
| } |
| |
| auto promise = new TPromise!(ReturnType!(MemberType!(Interface, method))); |
| Exception[] rpcExceptions; |
| |
| void tryNext() { |
| while (clients.empty) { |
| Client next; |
| Duration waitTime; |
| if (clients.willBecomeNonempty(next, waitTime)) { |
| if (waitTime > dur!"hnsecs"(0)) { |
| if (waitTime < dur!"usecs"(10)) { |
| import core.thread; |
| Thread.sleep(waitTime); |
| } else { |
| next.transport.asyncManager.delay(waitTime, { tryNext(); }); |
| return; |
| } |
| } |
| } else { |
| promise.fail(new TCompoundOperationException("All clients failed.", |
| rpcExceptions)); |
| return; |
| } |
| } |
| |
| auto client = clients.front; |
| clients.popFront; |
| |
| if (reopenTransports) { |
| if (!client.transport.isOpen) { |
| try { |
| client.transport.open(); |
| } catch (Exception e) { |
| pool_.recordFault(client); |
| tryNext(); |
| return; |
| } |
| } |
| } |
| |
| auto future = mixin("client." ~ method)(args, cancellation); |
| future.completion.addCallback({ |
| if (future.status == TFutureStatus.CANCELLED) { |
| promise.cancel(); |
| return; |
| } |
| |
| auto e = future.getException(); |
| if (e) { |
| if (rpcFaultFilter_ && rpcFaultFilter_(e)) { |
| pool_.recordFault(client); |
| rpcExceptions ~= e; |
| tryNext(); |
| return; |
| } |
| } |
| pool_.recordSuccess(client); |
| promise.complete(future); |
| }); |
| } |
| |
| tryNext(); |
| return promise; |
| } |
| |
| private: |
| TResourcePool!Client pool_; |
| bool delegate(Exception) rpcFaultFilter_; |
| bool reopenTransports_; |
| } |
| |
| /** |
| * TAsyncClientPool construction helper to avoid having to explicitly |
| * specify the interface type, i.e. to allow the constructor being called |
| * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)). |
| */ |
| TAsyncClientPool!Interface tAsyncClientPool(Interface)( |
| TAsyncClientBase!Interface[] clients |
| ) if (isService!Interface) { |
| return new typeof(return)(clients); |
| } |
| |
| private { |
| // Cannot use an anonymous delegate literal for this because they aren't |
| // allowed in class scope. |
| string fallbackPoolForwardCode(Interface)() { |
| string code = ""; |
| |
| foreach (methodName; AllMemberMethodNames!Interface) { |
| enum qn = "Interface." ~ methodName; |
| code ~= "TFuture!(ReturnType!(" ~ qn ~ ")) " ~ methodName ~ |
| "(ParameterTypeTuple!(" ~ qn ~ ") args, TCancellation cancellation = null) {\n"; |
| code ~= "return executeOnPool!(`" ~ methodName ~ "`)(args, cancellation);\n"; |
| code ~= "}\n"; |
| } |
| |
| return code; |
| } |
| } |
| |
| /** |
| * A TAsyncClientPoolBase implementation which queries multiple servers at |
| * the same time and returns the first success response. |
| * |
| * The definition of »success« can be customized using the rpcFaultFilter() |
| * delegate property. If it is non-null and calling it for an exception set by |
| * a failed method invocation returns true, the error is considered to be |
| * caused by the RPC layer rather than the application layer, and the next |
| * server in the pool is tried. If all clients fail, the operation is marked |
| * as failed with a TCompoundOperationException. |
| */ |
| final class TAsyncFastestClientPool(Interface) if (isService!Interface) : |
| TAsyncClientPoolBase!Interface |
| { |
| /// |
| this(Client[] clients) { |
| clients_ = clients; |
| rpcFaultFilter_ = defaultRpcFaultFilter; |
| reopenTransports_ = true; |
| } |
| |
| /+override+/ void addClient(Client client) { |
| clients_ ~= client; |
| } |
| |
| /+override+/ bool removeClient(Client client) { |
| auto oldLength = clients_.length; |
| clients_ = removeEqual(clients_, client); |
| return clients_.length < oldLength; |
| } |
| |
| |
| /+override+/ bool delegate(Exception) rpcFaultFilter() const @property { |
| return rpcFaultFilter_; |
| } |
| |
| /+override+/ void rpcFaultFilter(bool delegate(Exception) value) @property { |
| rpcFaultFilter_ = value; |
| } |
| |
| /+override+/bool reopenTransports() const @property { |
| return reopenTransports_; |
| } |
| |
| /+override+/ void reopenTransports(bool value) @property { |
| reopenTransports_ = value; |
| } |
| |
| mixin(fastestPoolForwardCode!Interface()); |
| |
| private: |
| Client[] clients_; |
| bool delegate(Exception) rpcFaultFilter_; |
| bool reopenTransports_; |
| } |
| |
| /** |
| * TAsyncFastestClientPool construction helper to avoid having to explicitly |
| * specify the interface type, i.e. to allow the constructor being called |
| * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)). |
| */ |
| TAsyncFastestClientPool!Interface tAsyncFastestClientPool(Interface)( |
| TAsyncClientBase!Interface[] clients |
| ) if (isService!Interface) { |
| return new typeof(return)(clients); |
| } |
| |
| private { |
| // Cannot use an anonymous delegate literal for this because they aren't |
| // allowed in class scope. |
| string fastestPoolForwardCode(Interface)() { |
| string code = ""; |
| |
| foreach (methodName; AllMemberMethodNames!Interface) { |
| enum qn = "Interface." ~ methodName; |
| code ~= "TFuture!(ReturnType!(" ~ qn ~ ")) " ~ methodName ~ |
| "(ParameterTypeTuple!(" ~ qn ~ ") args, " ~ |
| "TCancellation cancellation = null) {\n"; |
| code ~= "enum methodName = `" ~ methodName ~ "`;\n"; |
| code ~= q{ |
| alias ReturnType!(MemberType!(Interface, methodName)) ResultType; |
| |
| auto childCancellation = new TCancellationOrigin; |
| |
| TFuture!ResultType[] futures; |
| futures.reserve(clients_.length); |
| |
| foreach (c; clients_) { |
| if (reopenTransports) { |
| if (!c.transport.isOpen) { |
| try { |
| c.transport.open(); |
| } catch (Exception e) { |
| continue; |
| } |
| } |
| } |
| futures ~= mixin("c." ~ methodName)(args, childCancellation); |
| } |
| |
| return new FastestPoolJob!(ResultType)( |
| futures, rpcFaultFilter, cancellation, childCancellation); |
| }; |
| code ~= "}\n"; |
| } |
| |
| return code; |
| } |
| |
| final class FastestPoolJob(Result) : TFuture!Result { |
| this(TFuture!Result[] poolFutures, bool delegate(Exception) rpcFaultFilter, |
| TCancellation cancellation, TCancellationOrigin childCancellation |
| ) { |
| resultPromise_ = new TPromise!Result; |
| poolFutures_ = poolFutures; |
| rpcFaultFilter_ = rpcFaultFilter; |
| childCancellation_ = childCancellation; |
| |
| foreach (future; poolFutures) { |
| future.completion.addCallback({ |
| auto f = future; |
| return { completionCallback(f); }; |
| }()); |
| if (future.status != TFutureStatus.RUNNING) { |
| // If the current future is already completed, we are done, don't |
| // bother adding callbacks for the others (they would just return |
| // immediately after acquiring the lock). |
| return; |
| } |
| } |
| |
| if (cancellation) { |
| cancellation.triggering.addCallback({ |
| resultPromise_.cancel(); |
| childCancellation.trigger(); |
| }); |
| } |
| } |
| |
| TFutureStatus status() const @property { |
| return resultPromise_.status; |
| } |
| |
| TAwaitable completion() @property { |
| return resultPromise_.completion; |
| } |
| |
| Result get() { |
| return resultPromise_.get(); |
| } |
| |
| Exception getException() { |
| return resultPromise_.getException(); |
| } |
| |
| private: |
| void completionCallback(TFuture!Result future) { |
| synchronized { |
| if (future.status == TFutureStatus.CANCELLED) { |
| assert(resultPromise_.status != TFutureStatus.RUNNING); |
| return; |
| } |
| |
| if (resultPromise_.status != TFutureStatus.RUNNING) { |
| // The operation has already been completed. This can happen if |
| // another client completed first, but this callback was already |
| // waiting for the lock when it called cancel(). |
| return; |
| } |
| |
| if (future.status == TFutureStatus.FAILED) { |
| auto e = future.getException(); |
| if (rpcFaultFilter_ && rpcFaultFilter_(e)) { |
| rpcExceptions_ ~= e; |
| |
| if (rpcExceptions_.length == poolFutures_.length) { |
| resultPromise_.fail(new TCompoundOperationException( |
| "All child operations failed, unable to retrieve a result.", |
| rpcExceptions_ |
| )); |
| } |
| |
| return; |
| } |
| } |
| |
| // Store the result to the target promise. |
| resultPromise_.complete(future); |
| |
| // Cancel the other futures, we would just discard their results. |
| // Note: We do this after we have stored the results to our promise, |
| // see the assert at the top of the function. |
| childCancellation_.trigger(); |
| } |
| } |
| |
| TPromise!Result resultPromise_; |
| TFuture!Result[] poolFutures_; |
| Exception[] rpcExceptions_; |
| bool delegate(Exception) rpcFaultFilter_; |
| TCancellationOrigin childCancellation_; |
| } |
| } |
| |
| /** |
| * Allows easily aggregating results from a number of TAsyncClients. |
| * |
| * Contrary to TAsync{Fallback, Fastest}ClientPool, this class does not |
| * simply implement TFutureInterface!Interface. It manages a pool of clients, |
| * but allows the user to specify a custom accumulator function to use or to |
| * iterate over the results using a TFutureAggregatorRange. |
| * |
| * For each service method, TAsyncAggregator offers a method |
| * accepting the same arguments, and an optional TCancellation instance, just |
| * like with TFutureInterface. The return type, however, is a proxy object |
| * that offers the following methods: |
| * --- |
| * /++ |
| * + Returns a thrift.util.future.TFutureAggregatorRange for the results of |
| * + the client pool method invocations. |
| * + |
| * + The [] (slicing) operator can also be used to obtain the range. |
| * + |
| * + Params: |
| * + timeout = A timeout to pass to the TFutureAggregatorRange constructor, |
| * + defaults to zero (no timeout). |
| * +/ |
| * TFutureAggregatorRange!ReturnType range(Duration timeout = dur!"hnsecs"(0)); |
| * auto opSlice() { return range(); } /// Ditto |
| * |
| * /++ |
| * + Returns a future that gathers the results from the clients in the pool |
| * + and invokes a user-supplied accumulator function on them, returning its |
| * + return value to the client. |
| * + |
| * + In addition to the TFuture!AccumulatedType interface (where |
| * + AccumulatedType is the return type of the accumulator function), the |
| * + returned object also offers two additional methods, finish() and |
| * + finishGet(): By default, the accumulator functions is called after all |
| * + the results from the pool clients have become available. Calling finish() |
| * + causes the accumulator future to stop waiting for other results and |
| * + immediately invoking the accumulator function on the results currently |
| * + available. If all results are already available, finish() is a no-op. |
| * + finishGet() is a convenience shortcut for combining it with |
| * + a call to get() immediately afterwards, like waitGet() is for wait(). |
| * + |
| * + The acc alias can point to any callable accepting either an array of |
| * + return values or an array of return values and an array of exceptions; |
| * + see isAccumulator!() for details. The default accumulator concatenates |
| * + return values that can be concatenated with each others (e.g. arrays), |
| * + and simply returns an array of values otherwise, failing with a |
| * + TCompoundOperationException no values were returned. |
| * + |
| * + The accumulator function is not executed in any of the async manager |
| * + worker threads associated with the async clients, but instead it is |
| * + invoked when the actual result is requested for the first time after the |
| * + operation has been completed. This also includes checking the status |
| * + of the operation once it is no longer running, since the accumulator |
| * + has to be run to determine whether the operation succeeded or failed. |
| * +/ |
| * auto accumulate(alias acc = defaultAccumulator)() if (isAccumulator!acc); |
| * --- |
| * |
| * Example: |
| * --- |
| * // Some Thrift service. |
| * interface Foo { |
| * int foo(string name); |
| * byte[] bar(); |
| * } |
| * |
| * // Create the aggregator pool – client0, client1, client2 are some |
| * // TAsyncClient!Foo instances, but in theory could also be other |
| * // TFutureInterface!Foo implementations (e.g. some async client pool). |
| * auto pool = new TAsyncAggregator!Foo([client0, client1, client2]); |
| * |
| * foreach (val; pool.foo("baz").range(dur!"seconds"(1))) { |
| * // Process all the results that are available before a second has passed, |
| * // in the order they arrive. |
| * writeln(val); |
| * } |
| * |
| * auto sumRoots = pool.foo("baz").accumulate!((int[] vals, Exceptions[] exs){ |
| * if (vals.empty) { |
| * throw new TCompoundOperationException("All clients failed", exs); |
| * } |
| * |
| * // Just to illustrate that the type of the values can change, convert the |
| * // numbers to double and sum up their roots. |
| * double result = 0; |
| * foreach (v; vals) result += sqrt(cast(double)v); |
| * return result; |
| * })(); |
| * |
| * // Wait up to three seconds for the result, and then accumulate what has |
| * // arrived so far. |
| * sumRoots.completion.wait(dur!"seconds"(3)); |
| * writeln(sumRoots.finishGet()); |
| * |
| * // For scalars, the default accumulator returns an array of the values. |
| * pragma(msg, typeof(pool.foo("").accumulate().get()); // int[]. |
| * |
| * // For lists, etc., it concatenates the results together. |
| * pragma(msg, typeof(pool.bar().accumulate().get())); // byte[]. |
| * --- |
| * |
| * Note: For the accumulate!() interface, you might currently hit a »cannot use |
| * local '…' as parameter to non-global template accumulate«-error, see |
| * $(DMDBUG 5710, DMD issue 5710). If your accumulator function does not need |
| * to access the surrounding scope, you might want to use a function literal |
| * instead of a delegate to avoid the issue. |
| */ |
| class TAsyncAggregator(Interface) if (isBaseService!Interface) { |
| /// Shorthand for the client type this instance operates on. |
| alias TAsyncClientBase!Interface Client; |
| |
| /// |
| this(Client[] clients) { |
| clients_ = clients; |
| } |
| |
| /// Whether to open the underlying transports of a client before trying to |
| /// execute a method if they are not open. This is usually desirable |
| /// because it allows e.g. to automatically reconnect to a remote server |
| /// if the network connection is dropped. |
| /// |
| /// Defaults to true. |
| bool reopenTransports = true; |
| |
| mixin AggregatorOpDispatch!(); |
| |
| private: |
| Client[] clients_; |
| } |
| |
| /// Ditto |
| class TAsyncAggregator(Interface) if (isDerivedService!Interface) : |
| TAsyncAggregator!(BaseService!Interface) |
| { |
| /// Shorthand for the client type this instance operates on. |
| alias TAsyncClientBase!Interface Client; |
| |
| /// |
| this(Client[] clients) { |
| super(cast(TAsyncClientBase!(BaseService!Interface)[])clients); |
| } |
| |
| mixin AggregatorOpDispatch!(); |
| } |
| |
| /** |
| * Whether fun is a valid accumulator function for values of type ValueType. |
| * |
| * For this to be true, fun must be a callable matching one of the following |
| * argument lists: |
| * --- |
| * fun(ValueType[] values); |
| * fun(ValueType[] values, Exception[] exceptions); |
| * --- |
| * |
| * The second version is passed the collected array exceptions from all the |
| * clients in the pool. |
| * |
| * The return value of the accumulator function is passed to the client (via |
| * the result future). If it throws an exception, the operation is marked as |
| * failed with the given exception instead. |
| */ |
| template isAccumulator(ValueType, alias fun) { |
| enum isAccumulator = is(typeof(fun(cast(ValueType[])[]))) || |
| is(typeof(fun(cast(ValueType[])[], cast(Exception[])[]))); |
| } |
| |
| /** |
| * TAsyncAggregator construction helper to avoid having to explicitly |
| * specify the interface type, i.e. to allow the constructor being called |
| * using IFTI (see $(DMDBUG 6082, D Bugzilla enhancement request 6082)). |
| */ |
| TAsyncAggregator!Interface tAsyncAggregator(Interface)( |
| TAsyncClientBase!Interface[] clients |
| ) if (isService!Interface) { |
| return new typeof(return)(clients); |
| } |
| |
| private { |
| mixin template AggregatorOpDispatch() { |
| auto opDispatch(string name, Args...)(Args args) if ( |
| is(typeof(mixin("Interface.init." ~ name)(args))) |
| ) { |
| alias ReturnType!(MemberType!(Interface, name)) ResultType; |
| |
| auto childCancellation = new TCancellationOrigin; |
| |
| TFuture!ResultType[] futures; |
| futures.reserve(clients_.length); |
| |
| foreach (c; cast(Client[])clients_) { |
| if (reopenTransports) { |
| if (!c.transport.isOpen) { |
| try { |
| c.transport.open(); |
| } catch (Exception e) { |
| continue; |
| } |
| } |
| } |
| futures ~= mixin("c." ~ name)(args, childCancellation); |
| } |
| |
| return AggregationResult!ResultType(futures, childCancellation); |
| } |
| } |
| |
| struct AggregationResult(T) { |
| auto opSlice() { |
| return range(); |
| } |
| |
| auto range(Duration timeout = dur!"hnsecs"(0)) { |
| return tFutureAggregatorRange(futures_, childCancellation_, timeout); |
| } |
| |
| auto accumulate(alias acc = defaultAccumulator)() if (isAccumulator!(T, acc)) { |
| return new AccumulatorJob!(T, acc)(futures_, childCancellation_); |
| } |
| |
| private: |
| TFuture!T[] futures_; |
| TCancellationOrigin childCancellation_; |
| } |
| |
| auto defaultAccumulator(T)(T[] values, Exception[] exceptions) { |
| if (values.empty) { |
| throw new TCompoundOperationException("All clients failed", |
| exceptions); |
| } |
| |
| static if (is(typeof(T.init ~ T.init))) { |
| import std.algorithm; |
| return reduce!"a ~ b"(values); |
| } else { |
| return values; |
| } |
| } |
| |
| final class AccumulatorJob(T, alias accumulator) if ( |
| isAccumulator!(T, accumulator) |
| ) : TFuture!(AccumulatorResult!(T, accumulator)) { |
| this(TFuture!T[] futures, TCancellationOrigin childCancellation) { |
| futures_ = futures; |
| childCancellation_ = childCancellation; |
| resultMutex_ = new Mutex; |
| completionEvent_ = new TOneshotEvent; |
| |
| foreach (future; futures) { |
| future.completion.addCallback({ |
| auto f = future; |
| return { |
| synchronized (resultMutex_) { |
| if (f.status == TFutureStatus.CANCELLED) { |
| if (!finished_) { |
| status_ = TFutureStatus.CANCELLED; |
| finished_ = true; |
| } |
| return; |
| } |
| |
| if (f.status == TFutureStatus.FAILED) { |
| exceptions_ ~= f.getException(); |
| } else { |
| results_ ~= f.get(); |
| } |
| |
| if (results_.length + exceptions_.length == futures_.length) { |
| finished_ = true; |
| completionEvent_.trigger(); |
| } |
| } |
| }; |
| }()); |
| } |
| } |
| |
| TFutureStatus status() @property { |
| synchronized (resultMutex_) { |
| if (!finished_) return TFutureStatus.RUNNING; |
| if (status_ != TFutureStatus.RUNNING) return status_; |
| |
| try { |
| result_ = invokeAccumulator!accumulator(results_, exceptions_); |
| status_ = TFutureStatus.SUCCEEDED; |
| } catch (Exception e) { |
| exception_ = e; |
| status_ = TFutureStatus.FAILED; |
| } |
| |
| return status_; |
| } |
| } |
| |
| TAwaitable completion() @property { |
| return completionEvent_; |
| } |
| |
| AccumulatorResult!(T, accumulator) get() { |
| auto s = status; |
| |
| enforce(s != TFutureStatus.RUNNING, |
| new TFutureException("Operation not yet completed.")); |
| |
| if (s == TFutureStatus.CANCELLED) throw new TCancelledException; |
| if (s == TFutureStatus.FAILED) throw exception_; |
| return result_; |
| } |
| |
| Exception getException() { |
| auto s = status; |
| enforce(s != TFutureStatus.RUNNING, |
| new TFutureException("Operation not yet completed.")); |
| |
| if (s == TFutureStatus.CANCELLED) throw new TCancelledException; |
| |
| if (s == TFutureStatus.SUCCEEDED) { |
| return null; |
| } |
| return exception_; |
| } |
| |
| void finish() { |
| synchronized (resultMutex_) { |
| if (!finished_) { |
| finished_ = true; |
| childCancellation_.trigger(); |
| completionEvent_.trigger(); |
| } |
| } |
| } |
| |
| auto finishGet() { |
| finish(); |
| return get(); |
| } |
| |
| private: |
| TFuture!T[] futures_; |
| TCancellationOrigin childCancellation_; |
| |
| bool finished_; |
| T[] results_; |
| Exception[] exceptions_; |
| |
| TFutureStatus status_; |
| Mutex resultMutex_; |
| union { |
| AccumulatorResult!(T, accumulator) result_; |
| Exception exception_; |
| } |
| TOneshotEvent completionEvent_; |
| } |
| |
| auto invokeAccumulator(alias accumulator, T)( |
| T[] values, Exception[] exceptions |
| ) if ( |
| isAccumulator!(T, accumulator) |
| ) { |
| static if (is(typeof(accumulator(values, exceptions)))) { |
| return accumulator(values, exceptions); |
| } else { |
| return accumulator(values); |
| } |
| } |
| |
| template AccumulatorResult(T, alias acc) { |
| alias typeof(invokeAccumulator!acc(cast(T[])[], cast(Exception[])[])) |
| AccumulatorResult; |
| } |
| } |