| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.tuweni.concurrent.coroutines |
| |
| import kotlinx.coroutines.CancellableContinuation |
| import kotlinx.coroutines.CompletableDeferred |
| import kotlinx.coroutines.CoroutineScope |
| import kotlinx.coroutines.CoroutineStart |
| import kotlinx.coroutines.Deferred |
| import kotlinx.coroutines.Dispatchers |
| import kotlinx.coroutines.ExperimentalCoroutinesApi |
| import kotlinx.coroutines.InternalCoroutinesApi |
| import kotlinx.coroutines.Job |
| import kotlinx.coroutines.ObsoleteCoroutinesApi |
| import kotlinx.coroutines.newCoroutineContext |
| import kotlinx.coroutines.suspendCancellableCoroutine |
| import org.apache.tuweni.concurrent.AsyncResult |
| import org.apache.tuweni.concurrent.CompletableAsyncResult |
| import java.util.concurrent.CancellationException |
| import java.util.concurrent.CompletionException |
| import java.util.function.BiConsumer |
| import kotlin.Result |
| import kotlin.coroutines.Continuation |
| import kotlin.coroutines.CoroutineContext |
| import kotlin.coroutines.resume |
| import kotlin.coroutines.resumeWithException |
| |
| /** |
| * Starts new co-routine and returns its result as an implementation of [AsyncResult]. |
| * The running co-outine is cancelled when the resulting future is cancelled or otherwise completed. |
| * |
| * Co-routine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] |
| * argument. If the context does not have any dispatcher nor any other [ContinuationInterceptor], then |
| * [Dispatchers.Default] is used. The parent job is inherited from a [CoroutineScope] as well, but it can also be |
| * overridden with corresponding [coroutineContext] element. |
| * |
| * By default, the co-routine is immediately scheduled for execution. Other options can be specified via `start` |
| * parameter. See [CoroutineStart] for details. A value of [CoroutineStart.LAZY] is not supported (since |
| * `AsyncResult` framework does not provide the corresponding capability) and produces [IllegalArgumentException]. |
| * |
| * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are |
| * available for newly created co-routine. |
| * |
| * @param context Additional to [CoroutineScope.coroutineContext] context of the coroutine. |
| * @param start Co-routine start option. The default value is [CoroutineStart.DEFAULT]. |
| * @param block The co-routine code. |
| */ |
| @UseExperimental(InternalCoroutinesApi::class, ObsoleteCoroutinesApi::class, ExperimentalCoroutinesApi::class) |
| fun <T> CoroutineScope.asyncResult( |
| context: CoroutineContext = Dispatchers.Default, |
| start: CoroutineStart = CoroutineStart.DEFAULT, |
| block: suspend CoroutineScope.() -> T |
| ): AsyncResult<T> { |
| require(!start.isLazy) { "$start start is not supported" } |
| val newContext = this.newCoroutineContext(context) |
| val job = Job(newContext[Job]) |
| val coroutine = AsyncResultCoroutine<T>(newContext + job) |
| job.invokeOnCompletion { coroutine.asyncResult.cancel() } |
| coroutine.asyncResult.whenComplete { _, _ -> job.cancel() } |
| start(block, receiver = coroutine, completion = coroutine) // use the specified start strategy |
| return coroutine.asyncResult |
| } |
| |
| private class AsyncResultCoroutine<T>( |
| override val context: CoroutineContext, |
| val asyncResult: CompletableAsyncResult<T> = AsyncResult.incomplete() |
| ) : Continuation<T>, CoroutineScope { |
| override val coroutineContext: CoroutineContext get() = context |
| override fun resumeWith(result: Result<T>) { |
| result |
| .onSuccess { asyncResult.complete(it) } |
| .onFailure { asyncResult.completeExceptionally(it) } |
| } |
| } |
| |
| /** |
| * Converts this deferred value to an [AsyncResult]. |
| * The deferred value is cancelled when the returned [AsyncResult] is cancelled or otherwise completed. |
| */ |
| @UseExperimental(ExperimentalCoroutinesApi::class, ObsoleteCoroutinesApi::class) |
| fun <T> Deferred<T>.asAsyncResult(): AsyncResult<T> { |
| val asyncResult = AsyncResult.incomplete<T>() |
| asyncResult.whenComplete { _, _ -> cancel() } |
| invokeOnCompletion { |
| try { |
| asyncResult.complete(getCompleted()) |
| } catch (exception: Exception) { |
| asyncResult.completeExceptionally(exception) |
| } |
| } |
| return asyncResult |
| } |
| |
| /** |
| * Converts this [AsyncResult] to an instance of [Deferred]. |
| * The [AsyncResult] is cancelled when the resulting deferred is cancelled. |
| */ |
| @UseExperimental(ObsoleteCoroutinesApi::class) |
| fun <T> AsyncResult<T>.asDeferred(): Deferred<T> { |
| // Fast path if already completed |
| if (isDone) { |
| return try { |
| @Suppress("UNCHECKED_CAST") |
| CompletableDeferred(get() as T) |
| } catch (e: Throwable) { |
| // unwrap original cause from CompletionException |
| val original = (e as? CompletionException)?.cause ?: e |
| CompletableDeferred<T>().also { it.completeExceptionally(original) } |
| } |
| } |
| val result = CompletableDeferred<T>() |
| whenComplete { value, exception -> |
| if (exception == null) { |
| result.complete(value) |
| } else { |
| result.completeExceptionally(exception) |
| } |
| } |
| result.invokeOnCompletion { this.cancel() } |
| return result |
| } |
| |
| /** |
| * Awaits for completion of the [AsyncResult] without blocking a thread. |
| * |
| * This suspending function is cancellable. |
| * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function |
| * stops waiting for the [AsyncResult] and immediately resumes with [CancellationException]. |
| * |
| * Note, that [AsyncResult] does not support prompt removal of listeners, so on cancellation of this wait |
| * a few small objects will remain in the [AsyncResult] stack of completion actions until it completes itself. |
| * However, care is taken to clear the reference to the waiting coroutine itself, so that its memory can be |
| * released even if the [AsyncResult] never completes. |
| */ |
| suspend fun <T> AsyncResult<T>.await(): T { |
| // fast path when CompletableFuture is already done (does not suspend) |
| if (isDone) { |
| try { |
| @Suppress("UNCHECKED_CAST") |
| return get() as T |
| } catch (e: CompletionException) { |
| throw e.cause ?: e // unwrap original cause from CompletionException |
| } |
| } |
| // slow path -- suspend |
| return suspendCancellableCoroutine { cont: CancellableContinuation<T> -> |
| val consumer = ContinuationBiConsumer(cont) |
| whenComplete(consumer) |
| cont.invokeOnCancellation { |
| consumer.cont = null // shall clear reference to continuation |
| } |
| } |
| } |
| |
| private class ContinuationBiConsumer<T>( |
| @Volatile @JvmField var cont: Continuation<T>? |
| ) : BiConsumer<T?, Throwable?> { |
| @Suppress("UNCHECKED_CAST") |
| override fun accept(result: T?, exception: Throwable?) { |
| val cont = this.cont ?: return // atomically read current value unless null |
| if (exception == null) { |
| // the future has been completed normally |
| cont.resume(result as T) |
| } else { |
| // the future has completed with an exception, unwrap it to provide consistent view of .await() result and to propagate only original exception |
| cont.resumeWithException((exception as? CompletionException)?.cause ?: exception) |
| } |
| } |
| } |