blob: 37cfd1eefa18fc23c64f82f42bb79a4422002d73 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertEquals;
/**
* A unit test for KafkaFuture.
*/
public class KafkaFutureTest {
@Rule
final public Timeout globalTimeout = Timeout.millis(120000);
@Test
public void testCompleteFutures() throws Exception {
KafkaFutureImpl<Integer> future123 = new KafkaFutureImpl<>();
assertTrue(future123.complete(123));
assertEquals(Integer.valueOf(123), future123.get());
assertFalse(future123.complete(456));
assertTrue(future123.isDone());
assertFalse(future123.isCancelled());
assertFalse(future123.isCompletedExceptionally());
KafkaFuture<Integer> future456 = KafkaFuture.completedFuture(456);
assertEquals(Integer.valueOf(456), future456.get());
KafkaFutureImpl<Integer> futureFail = new KafkaFutureImpl<>();
futureFail.completeExceptionally(new RuntimeException("We require more vespene gas"));
try {
futureFail.get();
Assert.fail("Expected an exception");
} catch (ExecutionException e) {
assertEquals(RuntimeException.class, e.getCause().getClass());
Assert.assertEquals("We require more vespene gas", e.getCause().getMessage());
}
}
@Test
public void testCompletingFutures() throws Exception {
final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
CompleterThread<String> myThread = new CompleterThread<>(future, "You must construct additional pylons.");
assertFalse(future.isDone());
assertFalse(future.isCompletedExceptionally());
assertFalse(future.isCancelled());
assertEquals("I am ready", future.getNow("I am ready"));
myThread.start();
String str = future.get(5, TimeUnit.MINUTES);
assertEquals("You must construct additional pylons.", str);
assertEquals("You must construct additional pylons.", future.getNow("I am ready"));
assertTrue(future.isDone());
assertFalse(future.isCompletedExceptionally());
assertFalse(future.isCancelled());
myThread.join();
assertEquals(null, myThread.testException);
}
@Test
public void testThenApply() throws Exception {
KafkaFutureImpl<Integer> future = new KafkaFutureImpl<>();
KafkaFuture<Integer> doubledFuture = future.thenApply(integer -> 2 * integer);
assertFalse(doubledFuture.isDone());
KafkaFuture<Integer> tripledFuture = future.thenApply(integer -> 3 * integer);
assertFalse(tripledFuture.isDone());
future.complete(21);
assertEquals(Integer.valueOf(21), future.getNow(-1));
assertEquals(Integer.valueOf(42), doubledFuture.getNow(-1));
assertEquals(Integer.valueOf(63), tripledFuture.getNow(-1));
KafkaFuture<Integer> quadrupledFuture = future.thenApply(integer -> 4 * integer);
assertEquals(Integer.valueOf(84), quadrupledFuture.getNow(-1));
KafkaFutureImpl<Integer> futureFail = new KafkaFutureImpl<>();
KafkaFuture<Integer> futureAppliedFail = futureFail.thenApply(integer -> 2 * integer);
futureFail.completeExceptionally(new RuntimeException());
assertTrue(futureFail.isCompletedExceptionally());
assertTrue(futureAppliedFail.isCompletedExceptionally());
}
private static class CompleterThread<T> extends Thread {
private final KafkaFutureImpl<T> future;
private final T value;
Throwable testException = null;
CompleterThread(KafkaFutureImpl<T> future, T value) {
this.future = future;
this.value = value;
}
@Override
public void run() {
try {
try {
Thread.sleep(0, 200);
} catch (InterruptedException e) {
}
future.complete(value);
} catch (Throwable testException) {
this.testException = testException;
}
}
}
private static class WaiterThread<T> extends Thread {
private final KafkaFutureImpl<T> future;
private final T expected;
Throwable testException = null;
WaiterThread(KafkaFutureImpl<T> future, T expected) {
this.future = future;
this.expected = expected;
}
@Override
public void run() {
try {
T value = future.get();
assertEquals(expected, value);
} catch (Throwable testException) {
this.testException = testException;
}
}
}
@Test
public void testAllOfFutures() throws Exception {
final int numThreads = 5;
final List<KafkaFutureImpl<Integer>> futures = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
futures.add(new KafkaFutureImpl<>());
}
KafkaFuture<Void> allFuture = KafkaFuture.allOf(futures.toArray(new KafkaFuture[0]));
final List<CompleterThread> completerThreads = new ArrayList<>();
final List<WaiterThread> waiterThreads = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
completerThreads.add(new CompleterThread<>(futures.get(i), i));
waiterThreads.add(new WaiterThread<>(futures.get(i), i));
}
assertFalse(allFuture.isDone());
for (int i = 0; i < numThreads; i++) {
waiterThreads.get(i).start();
}
for (int i = 0; i < numThreads - 1; i++) {
completerThreads.get(i).start();
}
assertFalse(allFuture.isDone());
completerThreads.get(numThreads - 1).start();
allFuture.get();
assertTrue(allFuture.isDone());
for (int i = 0; i < numThreads; i++) {
assertEquals(Integer.valueOf(i), futures.get(i).get());
}
for (int i = 0; i < numThreads; i++) {
completerThreads.get(i).join();
waiterThreads.get(i).join();
assertEquals(null, completerThreads.get(i).testException);
assertEquals(null, waiterThreads.get(i).testException);
}
}
@Test
public void testAllOfFuturesHandlesZeroFutures() throws Exception {
KafkaFuture<Void> allFuture = KafkaFuture.allOf();
assertTrue(allFuture.isDone());
assertFalse(allFuture.isCancelled());
assertFalse(allFuture.isCompletedExceptionally());
allFuture.get();
}
@Test(expected = TimeoutException.class)
public void testFutureTimeoutWithZeroWait() throws Exception {
final KafkaFutureImpl<String> future = new KafkaFutureImpl<>();
future.get(0, TimeUnit.MILLISECONDS);
}
}