blob: ddfb7aed80be99c1459ca045e7392a2ca06e51dc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog.common.concurrent;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.LongStream;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.distributedlog.util.OrderedScheduler;
import org.junit.Test;
/**
* Unit Test for {@link FutureUtils}.
*/
public class TestFutureUtils {
/**
* Test Exception.
*/
static class TestException extends IOException {
private static final long serialVersionUID = -6256482498453846308L;
public TestException() {
super("test-exception");
}
}
@Test
public void testComplete() throws Exception {
CompletableFuture<Long> future = FutureUtils.createFuture();
FutureUtils.complete(future, 1024L);
assertEquals(1024L, FutureUtils.result(future).longValue());
}
@Test(expected = TestException.class)
public void testCompleteExceptionally() throws Exception {
CompletableFuture<Long> future = FutureUtils.createFuture();
FutureUtils.completeExceptionally(future, new TestException());
FutureUtils.result(future);
}
@Test
public void testWhenCompleteAsync() throws Exception {
OrderedScheduler scheduler = OrderedScheduler.newBuilder()
.name("test-when-complete-async")
.corePoolSize(1)
.build();
AtomicLong resultHolder = new AtomicLong(0L);
CountDownLatch latch = new CountDownLatch(1);
CompletableFuture<Long> future = FutureUtils.createFuture();
FutureUtils.whenCompleteAsync(
future,
(result, cause) -> {
resultHolder.set(result);
latch.countDown();
},
scheduler,
new Object());
FutureUtils.complete(future, 1234L);
latch.await();
assertEquals(1234L, resultHolder.get());
}
@Test
public void testProxyToSuccess() throws Exception {
CompletableFuture<Long> src = FutureUtils.createFuture();
CompletableFuture<Long> target = FutureUtils.createFuture();
FutureUtils.proxyTo(src, target);
FutureUtils.complete(src, 10L);
assertEquals(10L, FutureUtils.result(target).longValue());
}
@Test(expected = TestException.class)
public void testProxyToFailure() throws Exception {
CompletableFuture<Long> src = FutureUtils.createFuture();
CompletableFuture<Long> target = FutureUtils.createFuture();
FutureUtils.proxyTo(src, target);
FutureUtils.completeExceptionally(src, new TestException());
FutureUtils.result(target);
}
@Test
public void testVoid() throws Exception {
CompletableFuture<Void> voidFuture = FutureUtils.Void();
assertTrue(voidFuture.isDone());
assertFalse(voidFuture.isCompletedExceptionally());
assertFalse(voidFuture.isCancelled());
}
@Test
public void testCollectEmptyList() throws Exception {
List<CompletableFuture<Integer>> futures = Lists.newArrayList();
List<Integer> result = FutureUtils.result(FutureUtils.collect(futures));
assertTrue(result.isEmpty());
}
@Test
public void testCollectTenItems() throws Exception {
List<CompletableFuture<Integer>> futures = Lists.newArrayList();
List<Integer> expectedResults = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
futures.add(FutureUtils.value(i));
expectedResults.add(i);
}
List<Integer> results = FutureUtils.result(FutureUtils.collect(futures));
assertEquals(expectedResults, results);
}
@Test(expected = TestException.class)
public void testCollectFailures() throws Exception {
List<CompletableFuture<Integer>> futures = Lists.newArrayList();
List<Integer> expectedResults = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
if (i == 9) {
futures.add(FutureUtils.value(i));
} else {
futures.add(FutureUtils.exception(new TestException()));
}
expectedResults.add(i);
}
FutureUtils.result(FutureUtils.collect(futures));
}
@Test
public void testWithinAlreadyDone() throws Exception {
OrderedScheduler scheduler = mock(OrderedScheduler.class);
CompletableFuture<Long> doneFuture = FutureUtils.value(1234L);
CompletableFuture<Long> withinFuture = FutureUtils.within(
doneFuture,
10,
TimeUnit.MILLISECONDS,
new TestException(),
scheduler,
1234L);
TimeUnit.MILLISECONDS.sleep(20);
assertTrue(withinFuture.isDone());
assertFalse(withinFuture.isCancelled());
assertFalse(withinFuture.isCompletedExceptionally());
verify(scheduler, times(0))
.schedule(eq(1234L), isA(Runnable.class), eq(10), eq(TimeUnit.MILLISECONDS));
}
@Test
public void testWithinZeroTimeout() throws Exception {
OrderedScheduler scheduler = mock(OrderedScheduler.class);
CompletableFuture<Long> newFuture = FutureUtils.createFuture();
CompletableFuture<Long> withinFuture = FutureUtils.within(
newFuture,
0,
TimeUnit.MILLISECONDS,
new TestException(),
scheduler,
1234L);
TimeUnit.MILLISECONDS.sleep(20);
assertFalse(withinFuture.isDone());
assertFalse(withinFuture.isCancelled());
assertFalse(withinFuture.isCompletedExceptionally());
verify(scheduler, times(0))
.schedule(eq(1234L), isA(Runnable.class), eq(10), eq(TimeUnit.MILLISECONDS));
}
@Test
public void testWithinCompleteBeforeTimeout() throws Exception {
OrderedScheduler scheduler = mock(OrderedScheduler.class);
ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class);
when(scheduler.schedule(anyObject(), any(Runnable.class), anyLong(), any(TimeUnit.class)))
.thenAnswer(invocationOnMock -> scheduledFuture);
CompletableFuture<Long> newFuture = FutureUtils.createFuture();
CompletableFuture<Long> withinFuture = FutureUtils.within(
newFuture,
Long.MAX_VALUE,
TimeUnit.MILLISECONDS,
new TestException(),
scheduler,
1234L);
assertFalse(withinFuture.isDone());
assertFalse(withinFuture.isCancelled());
assertFalse(withinFuture.isCompletedExceptionally());
newFuture.complete(5678L);
assertTrue(withinFuture.isDone());
assertFalse(withinFuture.isCancelled());
assertFalse(withinFuture.isCompletedExceptionally());
assertEquals((Long) 5678L, FutureUtils.result(withinFuture));
verify(scheduledFuture, times(1))
.cancel(eq(true));
}
@Test
public void testIgnoreSuccess() {
CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
CompletableFuture<Void> ignoredFuture = FutureUtils.ignore(underlyFuture);
underlyFuture.complete(1234L);
assertTrue(ignoredFuture.isDone());
assertFalse(ignoredFuture.isCompletedExceptionally());
assertFalse(ignoredFuture.isCancelled());
}
@Test
public void testIgnoreFailure() {
CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
CompletableFuture<Void> ignoredFuture = FutureUtils.ignore(underlyFuture);
underlyFuture.completeExceptionally(new TestException());
assertTrue(ignoredFuture.isDone());
assertFalse(ignoredFuture.isCompletedExceptionally());
assertFalse(ignoredFuture.isCancelled());
}
@Test
public void testEnsureSuccess() throws Exception {
CountDownLatch ensureLatch = new CountDownLatch(1);
CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
CompletableFuture<Long> ensuredFuture = FutureUtils.ensure(underlyFuture, () -> {
ensureLatch.countDown();
});
underlyFuture.complete(1234L);
FutureUtils.result(ensuredFuture);
assertTrue(ensuredFuture.isDone());
assertFalse(ensuredFuture.isCompletedExceptionally());
assertFalse(ensuredFuture.isCancelled());
ensureLatch.await();
}
@Test
public void testEnsureFailure() throws Exception {
CountDownLatch ensureLatch = new CountDownLatch(1);
CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
CompletableFuture<Long> ensuredFuture = FutureUtils.ensure(underlyFuture, () -> {
ensureLatch.countDown();
});
underlyFuture.completeExceptionally(new TestException());
FutureUtils.result(FutureUtils.ignore(ensuredFuture));
assertTrue(ensuredFuture.isDone());
assertTrue(ensuredFuture.isCompletedExceptionally());
assertFalse(ensuredFuture.isCancelled());
ensureLatch.await();
}
@Test
public void testRescueSuccess() throws Exception {
CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
Function<Throwable, CompletableFuture<Long>> rescueFuc = mock(Function.class);
CompletableFuture<Long> rescuedFuture = FutureUtils.rescue(underlyFuture, rescueFuc);
underlyFuture.complete(1234L);
FutureUtils.result(rescuedFuture);
assertTrue(rescuedFuture.isDone());
assertFalse(rescuedFuture.isCompletedExceptionally());
assertFalse(rescuedFuture.isCancelled());
verify(rescueFuc, times(0)).apply(any(Throwable.class));
}
@Test
public void testRescueFailure() throws Exception {
CompletableFuture<Long> futureCompletedAtRescue = FutureUtils.value(3456L);
CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
CompletableFuture<Long> rescuedFuture = FutureUtils.rescue(underlyFuture, (cause) -> futureCompletedAtRescue);
underlyFuture.completeExceptionally(new TestException());
FutureUtils.result(rescuedFuture);
assertTrue(rescuedFuture.isDone());
assertFalse(rescuedFuture.isCompletedExceptionally());
assertFalse(rescuedFuture.isCancelled());
assertEquals((Long) 3456L, FutureUtils.result(rescuedFuture));
}
@Test
public void testStatsSuccess() throws Exception {
OpStatsLogger statsLogger = mock(OpStatsLogger.class);
CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
CompletableFuture<Long> statsFuture = FutureUtils.stats(
underlyFuture,
statsLogger,
Stopwatch.createStarted());
underlyFuture.complete(1234L);
FutureUtils.result(statsFuture);
verify(statsLogger, times(1)).registerSuccessfulEvent(anyLong());
}
@Test
public void testStatsFailure() throws Exception {
OpStatsLogger statsLogger = mock(OpStatsLogger.class);
CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
CompletableFuture<Long> statsFuture = FutureUtils.stats(
underlyFuture,
statsLogger,
Stopwatch.createStarted());
underlyFuture.completeExceptionally(new TestException());
FutureUtils.result(FutureUtils.ignore(statsFuture));
verify(statsLogger, times(1)).registerFailedEvent(anyLong());
}
@Test
public void testProcessListSuccess() throws Exception {
List<Long> longList = Lists.newArrayList(LongStream.range(0L, 10L).iterator());
List<Long> expectedList = Lists.transform(
longList,
aLong -> 2 * aLong);
Function<Long, CompletableFuture<Long>> sumFunc = value -> FutureUtils.value(2 * value);
CompletableFuture<List<Long>> totalFuture = FutureUtils.processList(
longList,
sumFunc,
null);
assertEquals(expectedList, FutureUtils.result(totalFuture));
}
@Test
public void testProcessEmptyList() throws Exception {
List<Long> longList = Lists.newArrayList();
List<Long> expectedList = Lists.transform(
longList,
aLong -> 2 * aLong);
Function<Long, CompletableFuture<Long>> sumFunc = value -> FutureUtils.value(2 * value);
CompletableFuture<List<Long>> totalFuture = FutureUtils.processList(
longList,
sumFunc,
null);
assertEquals(expectedList, FutureUtils.result(totalFuture));
}
@Test
public void testProcessListFailures() throws Exception {
List<Long> longList = Lists.newArrayList(LongStream.range(0L, 10L).iterator());
AtomicLong total = new AtomicLong(0L);
Function<Long, CompletableFuture<Long>> sumFunc = value -> {
if (value < 5) {
total.addAndGet(value);
return FutureUtils.value(2 * value);
} else {
return FutureUtils.exception(new TestException());
}
};
CompletableFuture<List<Long>> totalFuture = FutureUtils.processList(
longList,
sumFunc,
null);
try {
FutureUtils.result(totalFuture);
fail("Should fail with TestException");
} catch (TestException te) {
// as expected
}
assertEquals(10L, total.get());
}
}