blob: ceb819dee8f6afb83ab1bb764ae79cb307cd9480 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* A unit test for KafkaFuture.
*/
@Timeout(120)
public class KafkaFutureTest {
/** Asserts that the given future is done, didn't fail and wasn't cancelled. */
private void assertIsSuccessful(KafkaFuture<?> future) {
assertTrue(future.isDone());
assertFalse(future.isCompletedExceptionally());
assertFalse(future.isCancelled());
}
/** Asserts that the given future is done, failed and wasn't cancelled. */
private void assertIsFailed(KafkaFuture<?> future) {
assertTrue(future.isDone());
assertFalse(future.isCancelled());
assertTrue(future.isCompletedExceptionally());
}
/** Asserts that the given future is done, didn't fail and was cancelled. */
private void assertIsCancelled(KafkaFuture<?> future) {
assertTrue(future.isDone());
assertTrue(future.isCancelled());
assertTrue(future.isCompletedExceptionally());
assertThrows(CancellationException.class, () -> future.getNow(null));
assertThrows(CancellationException.class, () -> future.get(0, TimeUnit.MILLISECONDS));
}
private <T> void awaitAndAssertResult(KafkaFuture<T> future,
T expectedResult,
T alternativeValue) {
assertNotEquals(expectedResult, alternativeValue);
try {
assertEquals(expectedResult, future.get(5, TimeUnit.MINUTES));
} catch (Exception e) {
throw new AssertionError("Unexpected exception", e);
}
try {
assertEquals(expectedResult, future.get());
} catch (Exception e) {
throw new AssertionError("Unexpected exception", e);
}
try {
assertEquals(expectedResult, future.getNow(alternativeValue));
} catch (Exception e) {
throw new AssertionError("Unexpected exception", e);
}
}
private Throwable awaitAndAssertFailure(KafkaFuture<?> future,
Class<? extends Throwable> expectedException,
String expectedMessage) {
ExecutionException executionException = assertThrows(ExecutionException.class, () -> future.get(5, TimeUnit.MINUTES));
assertEquals(expectedException, executionException.getCause().getClass());
assertEquals(expectedMessage, executionException.getCause().getMessage());
executionException = assertThrows(ExecutionException.class, future::get);
assertEquals(expectedException, executionException.getCause().getClass());
assertEquals(expectedMessage, executionException.getCause().getMessage());
executionException = assertThrows(ExecutionException.class, () -> future.getNow(null));
assertEquals(expectedException, executionException.getCause().getClass());
assertEquals(expectedMessage, executionException.getCause().getMessage());
return executionException.getCause();
}
private void awaitAndAssertCancelled(KafkaFuture<?> future, String expectedMessage) {
CancellationException cancellationException = assertThrows(CancellationException.class, () -> future.get(5, TimeUnit.MINUTES));
assertEquals(expectedMessage, cancellationException.getMessage());
assertEquals(CancellationException.class, cancellationException.getClass());
cancellationException = assertThrows(CancellationException.class, future::get);
assertEquals(expectedMessage, cancellationException.getMessage());
assertEquals(CancellationException.class, cancellationException.getClass());
cancellationException = assertThrows(CancellationException.class, () -> future.getNow(null));
assertEquals(expectedMessage, cancellationException.getMessage());
assertEquals(CancellationException.class, cancellationException.getClass());
}
private Object invokeOrThrow(final Method method, final Object obj, final Object... args) throws Throwable {
try {
return method.invoke(obj, args);
} catch (InvocationTargetException e) {
throw e.getCause();
}
}
@Test
public void testCompleteFutures() throws Exception {
KafkaFutureImpl<Integer> future123 = new KafkaFutureImpl<>();
assertTrue(future123.complete(123));
assertFalse(future123.complete(456));
assertFalse(future123.cancel(true));
assertEquals(Integer.valueOf(123), future123.get());
assertIsSuccessful(future123);
KafkaFuture<Integer> future456 = KafkaFuture.completedFuture(456);
assertFalse(future456.complete(789));
assertFalse(future456.cancel(true));
assertEquals(Integer.valueOf(456), future456.get());
assertIsSuccessful(future456);
}
@Test
public void testCompleteFuturesExceptionally() {
KafkaFutureImpl<Integer> futureFail = new KafkaFutureImpl<>();
assertTrue(futureFail.completeExceptionally(new RuntimeException("We require more vespene gas")));
assertIsFailed(futureFail);
assertFalse(futureFail.completeExceptionally(new RuntimeException("We require more minerals")));
assertFalse(futureFail.cancel(true));
ExecutionException executionException = assertThrows(ExecutionException.class, futureFail::get);
assertEquals(RuntimeException.class, executionException.getCause().getClass());
assertEquals("We require more vespene gas", executionException.getCause().getMessage());
KafkaFutureImpl<Integer> tricky1 = new KafkaFutureImpl<>();
assertTrue(tricky1.completeExceptionally(new CompletionException(new CancellationException())));
assertIsFailed(tricky1);
awaitAndAssertFailure(tricky1, CompletionException.class, "java.util.concurrent.CancellationException");
}
@Test
public void testCompleteFuturesViaCancellation() {
KafkaFutureImpl<Integer> viaCancel = new KafkaFutureImpl<>();
assertTrue(viaCancel.cancel(true));
assertIsCancelled(viaCancel);
awaitAndAssertCancelled(viaCancel, null);
KafkaFutureImpl<Integer> viaCancellationException = new KafkaFutureImpl<>();
assertTrue(viaCancellationException.completeExceptionally(new CancellationException("We require more vespene gas")));
assertIsCancelled(viaCancellationException);
awaitAndAssertCancelled(viaCancellationException, "We require more vespene gas");
}
@Test
public void testToString() {
KafkaFutureImpl<Integer> success = new KafkaFutureImpl<>();
assertEquals("KafkaFuture{value=null,exception=null,done=false}", success.toString());
success.complete(12);
assertEquals("KafkaFuture{value=12,exception=null,done=true}", success.toString());
KafkaFutureImpl<Integer> failure = new KafkaFutureImpl<>();
failure.completeExceptionally(new RuntimeException("foo"));
assertEquals("KafkaFuture{value=null,exception=java.lang.RuntimeException: foo,done=true}", failure.toString());
KafkaFutureImpl<Integer> tricky1 = new KafkaFutureImpl<>();
tricky1.completeExceptionally(new CompletionException(new CancellationException()));
assertEquals("KafkaFuture{value=null,exception=java.util.concurrent.CompletionException: java.util.concurrent.CancellationException,done=true}", tricky1.toString());
KafkaFutureImpl<Integer> cancelled = new KafkaFutureImpl<>();
cancelled.cancel(true);
assertEquals("KafkaFuture{value=null,exception=java.util.concurrent.CancellationException,done=true}", cancelled.toString());
}
@Test
public void testCompletingFutures() throws Exception {
final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
CompleterThread<String> myThread = new CompleterThread<>(future, "You must construct additional pylons.");
assertIsNotCompleted(future);
assertEquals("I am ready", future.getNow("I am ready"));
myThread.start();
awaitAndAssertResult(future, "You must construct additional pylons.", "I am ready");
assertIsSuccessful(future);
myThread.join();
assertNull(myThread.testException);
}
@Test
public void testCompletingFuturesExceptionally() throws Exception {
final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
CompleterThread<String> myThread = new CompleterThread<>(future, null,
new RuntimeException("Ultimate efficiency achieved."));
assertIsNotCompleted(future);
assertEquals("I am ready", future.getNow("I am ready"));
myThread.start();
awaitAndAssertFailure(future, RuntimeException.class, "Ultimate efficiency achieved.");
assertIsFailed(future);
myThread.join();
assertNull(myThread.testException);
}
@Test
public void testCompletingFuturesViaCancellation() throws Exception {
final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
CompleterThread<String> myThread = new CompleterThread<>(future, null,
new CancellationException("Ultimate efficiency achieved."));
assertIsNotCompleted(future);
assertEquals("I am ready", future.getNow("I am ready"));
myThread.start();
awaitAndAssertCancelled(future, "Ultimate efficiency achieved.");
assertIsCancelled(future);
myThread.join();
assertNull(myThread.testException);
}
private void assertIsNotCompleted(KafkaFutureImpl<String> future) {
assertFalse(future.isDone());
assertFalse(future.isCompletedExceptionally());
assertFalse(future.isCancelled());
}
@Test
public void testThenApplyOnSucceededFuture() throws Exception {
KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
KafkaFuture<Integer> doubledFuture = future.thenApply(integer -> 2 * integer);
assertFalse(doubledFuture.isDone());
KafkaFuture<Integer> tripledFuture = future.thenApply(integer -> 3 * integer);
assertFalse(tripledFuture.isDone());
future.complete(21);
assertEquals(Integer.valueOf(21), future.getNow(-1));
assertEquals(Integer.valueOf(42), doubledFuture.getNow(-1));
assertEquals(Integer.valueOf(63), tripledFuture.getNow(-1));
KafkaFuture<Integer> quadrupledFuture = future.thenApply(integer -> 4 * integer);
assertEquals(Integer.valueOf(84), quadrupledFuture.getNow(-1));
}
@Test
public void testThenApplyOnFailedFuture() {
KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
KafkaFuture<Integer> dependantFuture = future.thenApply(integer -> 2 * integer);
future.completeExceptionally(new RuntimeException("We require more vespene gas"));
assertIsFailed(future);
assertIsFailed(dependantFuture);
awaitAndAssertFailure(future, RuntimeException.class, "We require more vespene gas");
awaitAndAssertFailure(dependantFuture, RuntimeException.class, "We require more vespene gas");
}
@Test
public void testThenApplyOnFailedFutureTricky() {
KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
KafkaFuture<Integer> dependantFuture = future.thenApply(integer -> 2 * integer);
future.completeExceptionally(new CompletionException(new RuntimeException("We require more vespene gas")));
assertIsFailed(future);
assertIsFailed(dependantFuture);
awaitAndAssertFailure(future, CompletionException.class, "java.lang.RuntimeException: We require more vespene gas");
awaitAndAssertFailure(dependantFuture, CompletionException.class, "java.lang.RuntimeException: We require more vespene gas");
}
@Test
public void testThenApplyOnFailedFutureTricky2() {
KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
KafkaFuture<Integer> dependantFuture = future.thenApply(integer -> 2 * integer);
future.completeExceptionally(new CompletionException(new CancellationException()));
assertIsFailed(future);
assertIsFailed(dependantFuture);
awaitAndAssertFailure(future, CompletionException.class, "java.util.concurrent.CancellationException");
awaitAndAssertFailure(dependantFuture, CompletionException.class, "java.util.concurrent.CancellationException");
}
@Test
public void testThenApplyOnSucceededFutureAndFunctionThrows() {
KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
KafkaFuture<Integer> dependantFuture = future.thenApply(integer -> {
throw new RuntimeException("We require more vespene gas");
});
future.complete(21);
assertIsSuccessful(future);
assertIsFailed(dependantFuture);
awaitAndAssertResult(future, 21, null);
awaitAndAssertFailure(dependantFuture, RuntimeException.class, "We require more vespene gas");
}
@Test
public void testThenApplyOnSucceededFutureAndFunctionThrowsCompletionException() {
KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
KafkaFuture<Integer> dependantFuture = future.thenApply(integer -> {
throw new CompletionException(new RuntimeException("We require more vespene gas"));
});
future.complete(21);
assertIsSuccessful(future);
assertIsFailed(dependantFuture);
awaitAndAssertResult(future, 21, null);
Throwable cause = awaitAndAssertFailure(dependantFuture, CompletionException.class, "java.lang.RuntimeException: We require more vespene gas");
assertInstanceOf(RuntimeException.class, cause.getCause());
assertEquals(cause.getCause().getMessage(), "We require more vespene gas");
}
@Test
public void testThenApplyOnFailedFutureFunctionNotCalled() {
KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
boolean[] ran = {false};
KafkaFuture<Integer> dependantFuture = future.thenApply(integer -> {
// Because the top level future failed, this should never be called.
ran[0] = true;
return null;
});
future.completeExceptionally(new RuntimeException("We require more minerals"));
assertIsFailed(future);
assertIsFailed(dependantFuture);
awaitAndAssertFailure(future, RuntimeException.class, "We require more minerals");
awaitAndAssertFailure(dependantFuture, RuntimeException.class, "We require more minerals");
assertFalse(ran[0]);
}
@Test
public void testThenApplyOnCancelledFuture() {
KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
KafkaFuture<Integer> dependantFuture = future.thenApply(integer -> 2 * integer);
future.cancel(true);
assertIsCancelled(future);
assertIsCancelled(dependantFuture);
awaitAndAssertCancelled(future, null);
awaitAndAssertCancelled(dependantFuture, null);
}
@Test
public void testWhenCompleteOnSucceededFuture() throws Throwable {
KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
Throwable[] err = new Throwable[1];
boolean[] ran = {false};
KafkaFuture<Integer> dependantFuture = future.whenComplete((integer, ex) -> {
ran[0] = true;
try {
assertEquals(Integer.valueOf(21), integer);
if (ex != null) {
throw ex;
}
} catch (Throwable e) {
err[0] = e;
}
});
assertFalse(dependantFuture.isDone());
assertTrue(future.complete(21));
assertTrue(ran[0]);
if (err[0] != null) {
throw err[0];
}
}
@Test
public void testWhenCompleteOnFailedFuture() {
KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
Throwable[] err = new Throwable[1];
boolean[] ran = {false};
KafkaFuture<Integer> dependantFuture = future.whenComplete((integer, ex) -> {
ran[0] = true;
err[0] = ex;
if (integer != null) {
err[0] = new AssertionError();
}
});
assertFalse(dependantFuture.isDone());
RuntimeException ex = new RuntimeException("We require more vespene gas");
assertTrue(future.completeExceptionally(ex));
assertTrue(ran[0]);
assertEquals(err[0], ex);
}
@Test
public void testWhenCompleteOnSucceededFutureAndConsumerThrows() {
KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
boolean[] ran = {false};
KafkaFuture<Integer> dependantFuture = future.whenComplete((integer, ex) -> {
ran[0] = true;
throw new RuntimeException("We require more minerals");
});
assertFalse(dependantFuture.isDone());
assertTrue(future.complete(21));
assertIsSuccessful(future);
assertTrue(ran[0]);
assertIsFailed(dependantFuture);
awaitAndAssertFailure(dependantFuture, RuntimeException.class, "We require more minerals");
}
@Test
public void testWhenCompleteOnFailedFutureAndConsumerThrows() {
KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
boolean[] ran = {false};
KafkaFuture<Integer> dependantFuture = future.whenComplete((integer, ex) -> {
ran[0] = true;
throw new RuntimeException("We require more minerals");
});
assertFalse(dependantFuture.isDone());
assertTrue(future.completeExceptionally(new RuntimeException("We require more vespene gas")));
assertIsFailed(future);
assertTrue(ran[0]);
assertIsFailed(dependantFuture);
awaitAndAssertFailure(dependantFuture, RuntimeException.class, "We require more vespene gas");
}
@Test
public void testWhenCompleteOnCancelledFuture() {
KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
Throwable[] err = new Throwable[1];
boolean[] ran = {false};
KafkaFuture<Integer> dependantFuture = future.whenComplete((integer, ex) -> {
ran[0] = true;
err[0] = ex;
if (integer != null) {
err[0] = new AssertionError();
}
});
assertFalse(dependantFuture.isDone());
assertTrue(future.cancel(true));
assertTrue(ran[0]);
assertInstanceOf(CancellationException.class, err[0]);
}
private static class CompleterThread<T> extends Thread {
private final KafkaFutureImpl<T> future;
private final T value;
private final Throwable exception;
Throwable testException = null;
CompleterThread(KafkaFutureImpl<T> future, T value) {
this.future = future;
this.value = value;
this.exception = null;
}
CompleterThread(KafkaFutureImpl<T> future, T value, Exception exception) {
this.future = future;
this.value = value;
this.exception = exception;
}
@Override
public void run() {
try {
try {
Thread.sleep(0, 200);
} catch (InterruptedException e) {
}
if (exception == null) {
future.complete(value);
} else {
future.completeExceptionally(exception);
}
} catch (Throwable testException) {
this.testException = testException;
}
}
}
private static class WaiterThread<T> extends Thread {
private final KafkaFutureImpl<T> future;
private final T expected;
Throwable testException = null;
WaiterThread(KafkaFutureImpl<T> future, T expected) {
this.future = future;
this.expected = expected;
}
@Override
public void run() {
try {
T value = future.get();
assertEquals(expected, value);
} catch (Throwable testException) {
this.testException = testException;
}
}
}
@Test
public void testAllOfFutures() throws Exception {
final int numThreads = 5;
final List<KafkaFutureImpl<Integer>> futures = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
futures.add(new KafkaFutureImpl<>());
}
KafkaFuture<Void> allFuture = KafkaFuture.allOf(futures.toArray(new KafkaFuture[0]));
final List<CompleterThread<Integer>> completerThreads = new ArrayList<>();
final List<WaiterThread<Integer>> waiterThreads = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
completerThreads.add(new CompleterThread<>(futures.get(i), i));
waiterThreads.add(new WaiterThread<>(futures.get(i), i));
}
assertFalse(allFuture.isDone());
for (int i = 0; i < numThreads; i++) {
waiterThreads.get(i).start();
}
for (int i = 0; i < numThreads - 1; i++) {
completerThreads.get(i).start();
}
assertFalse(allFuture.isDone());
completerThreads.get(numThreads - 1).start();
allFuture.get();
assertIsSuccessful(allFuture);
for (int i = 0; i < numThreads; i++) {
assertEquals(Integer.valueOf(i), futures.get(i).get());
}
for (int i = 0; i < numThreads; i++) {
completerThreads.get(i).join();
waiterThreads.get(i).join();
assertNull(completerThreads.get(i).testException);
assertNull(waiterThreads.get(i).testException);
}
}
@Test
public void testAllOfFuturesWithFailure() throws Exception {
final int numThreads = 5;
final List<KafkaFutureImpl<Integer>> futures = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
futures.add(new KafkaFutureImpl<>());
}
KafkaFuture<Void> allFuture = KafkaFuture.allOf(futures.toArray(new KafkaFuture[0]));
final List<CompleterThread<Integer>> completerThreads = new ArrayList<>();
final List<WaiterThread<Integer>> waiterThreads = new ArrayList<>();
int lastIndex = numThreads - 1;
for (int i = 0; i < lastIndex; i++) {
completerThreads.add(new CompleterThread<>(futures.get(i), i));
waiterThreads.add(new WaiterThread<>(futures.get(i), i));
}
completerThreads.add(new CompleterThread<>(futures.get(lastIndex), null, new RuntimeException("Last one failed")));
waiterThreads.add(new WaiterThread<>(futures.get(lastIndex), lastIndex));
assertFalse(allFuture.isDone());
for (int i = 0; i < numThreads; i++) {
waiterThreads.get(i).start();
}
for (int i = 0; i < lastIndex; i++) {
completerThreads.get(i).start();
}
assertFalse(allFuture.isDone());
completerThreads.get(lastIndex).start();
awaitAndAssertFailure(allFuture, RuntimeException.class, "Last one failed");
assertIsFailed(allFuture);
for (int i = 0; i < lastIndex; i++) {
assertEquals(Integer.valueOf(i), futures.get(i).get());
}
assertIsFailed(futures.get(lastIndex));
for (int i = 0; i < numThreads; i++) {
completerThreads.get(i).join();
waiterThreads.get(i).join();
assertNull(completerThreads.get(i).testException);
if (i == lastIndex) {
assertEquals(ExecutionException.class, waiterThreads.get(i).testException.getClass());
assertEquals(RuntimeException.class, waiterThreads.get(i).testException.getCause().getClass());
assertEquals("Last one failed", waiterThreads.get(i).testException.getCause().getMessage());
} else {
assertNull(waiterThreads.get(i).testException);
}
}
}
@Test
public void testAllOfFuturesHandlesZeroFutures() throws Exception {
KafkaFuture<Void> allFuture = KafkaFuture.allOf();
assertTrue(allFuture.isDone());
assertFalse(allFuture.isCancelled());
assertFalse(allFuture.isCompletedExceptionally());
allFuture.get();
}
@Test
public void testFutureTimeoutWithZeroWait() {
final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
assertThrows(TimeoutException.class, () -> future.get(0, TimeUnit.MILLISECONDS));
}
@Test
@SuppressWarnings("unchecked")
public void testLeakCompletableFuture() throws Throwable {
final KafkaFutureImpl<String> kfut = new KafkaFutureImpl<>();
CompletableFuture<String> comfut = kfut.toCompletionStage().toCompletableFuture();
assertThrows(UnsupportedOperationException.class, () -> comfut.complete(""));
assertThrows(UnsupportedOperationException.class, () -> comfut.completeExceptionally(new RuntimeException()));
Method completeOnTimeout = CompletableFuture.class.getDeclaredMethod("completeOnTimeout", Object.class, Long.TYPE, TimeUnit.class);
assertThrows(UnsupportedOperationException.class, () -> invokeOrThrow(completeOnTimeout, comfut, "", 1L, TimeUnit.MILLISECONDS));
Method completeAsync = CompletableFuture.class.getDeclaredMethod("completeAsync", Supplier.class);
assertThrows(UnsupportedOperationException.class, () -> invokeOrThrow(completeAsync, comfut, (Supplier<String>) () -> ""));
Method obtrudeValue = CompletableFuture.class.getDeclaredMethod("obtrudeValue", Object.class);
assertThrows(UnsupportedOperationException.class, () -> invokeOrThrow(obtrudeValue, comfut, ""));
Method obtrudeException = CompletableFuture.class.getDeclaredMethod("obtrudeException", Throwable.class);
assertThrows(UnsupportedOperationException.class, () -> invokeOrThrow(obtrudeException, comfut, new RuntimeException()));
// Check the CF from a minimal CompletionStage doesn't cause completion of the original KafkaFuture
Method minimal = CompletableFuture.class.getDeclaredMethod("minimalCompletionStage");
CompletionStage<String> cs = (CompletionStage<String>) invokeOrThrow(minimal, comfut);
cs.toCompletableFuture().complete("");
assertFalse(kfut.isDone());
assertFalse(comfut.isDone());
}
}