[Performance] Optimize CompletableFuture timeout handling (#10065)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 5a41037..abac85b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -23,15 +23,13 @@
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
+import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
@@ -60,6 +58,7 @@
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.policies.data.BrokerInfo;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
+import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,6 +67,7 @@
*/
public class BrokersBase extends PulsarWebResource {
private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class);
+ private static final Duration HEALTHCHECK_READ_TIMEOUT = Duration.ofSeconds(10);
@GET
@Path("/{cluster}")
@@ -346,13 +346,10 @@
healthcheckReadLoop(readerFuture, completePromise, messageStr);
// timeout read loop after 10 seconds
- ScheduledFuture<?> timeout = pulsar().getExecutor().schedule(() -> {
- completePromise.completeExceptionally(new TimeoutException("Timed out reading"));
- }, 10, TimeUnit.SECONDS);
- // don't leave timeout dangling
- completePromise.whenComplete((ignore2, exception2) -> {
- timeout.cancel(false);
- });
+ FutureUtil.addTimeoutHandling(completePromise,
+ HEALTHCHECK_READ_TIMEOUT, pulsar().getExecutor(),
+ () -> FutureUtil.createTimeoutException("Timed out reading", getClass(),
+ "healthcheck(...)"));
}
});
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2af76e9..1de8d9f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -45,6 +45,7 @@
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -179,6 +180,13 @@
@Setter(AccessLevel.PROTECTED)
public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies> {
private static final Logger log = LoggerFactory.getLogger(BrokerService.class);
+ private static final Duration FUTURE_DEADLINE_TIMEOUT_DURATION = Duration.ofSeconds(60);
+ private static final TimeoutException FUTURE_DEADLINE_TIMEOUT_EXCEPTION =
+ FutureUtil.createTimeoutException("Future didn't finish within deadline", BrokerService.class,
+ "futureWithDeadline(...)");
+ private static final TimeoutException FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION =
+ FutureUtil.createTimeoutException("Failed to load topic within timeout", BrokerService.class,
+ "futureWithDeadline(...)");
private final PulsarService pulsar;
private final ManagedLedgerFactory managedLedgerFactory;
@@ -847,7 +855,7 @@
}
} catch (IllegalArgumentException e) {
log.warn("[{}] Illegalargument exception when loading topic", topic, e);
- return failedFuture(e);
+ return FutureUtil.failedFuture(e);
} catch (RuntimeException e) {
Throwable cause = e.getCause();
if (cause instanceof ServiceUnitNotReadyException) {
@@ -856,7 +864,7 @@
log.warn("[{}] Unexpected exception when loading topic: {}", topic, e.getMessage(), e);
}
- return failedFuture(cause);
+ return FutureUtil.failedFuture(cause);
}
}
@@ -964,25 +972,9 @@
return topicFuture;
}
- private static <T> CompletableFuture<T> failedFuture(Throwable t) {
- CompletableFuture<T> future = new CompletableFuture<>();
- future.completeExceptionally(t);
- return future;
- }
-
- private <T> CompletableFuture<T> futureWithDeadline(Long delay, TimeUnit unit, Exception exp) {
- CompletableFuture<T> future = new CompletableFuture<T>();
- executor().schedule(() -> {
- if (!future.isDone()) {
- future.completeExceptionally(exp);
- }
- }, delay, unit);
- return future;
- }
-
private <T> CompletableFuture<T> futureWithDeadline() {
- return futureWithDeadline(60000L, TimeUnit.MILLISECONDS,
- new TimeoutException("Future didn't finish within deadline"));
+ return FutureUtil.createFutureWithTimeout(FUTURE_DEADLINE_TIMEOUT_DURATION, executor(),
+ () -> FUTURE_DEADLINE_TIMEOUT_EXCEPTION);
}
public PulsarClient getReplicationClient(String cluster) {
@@ -1093,9 +1085,9 @@
*/
protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final String topic,
boolean createIfMissing) throws RuntimeException {
- final CompletableFuture<Optional<Topic>> topicFuture = futureWithDeadline(
- pulsar.getConfiguration().getTopicLoadTimeoutSeconds(),
- TimeUnit.SECONDS, new TimeoutException("Failed to load topic within timeout"));
+ final CompletableFuture<Optional<Topic>> topicFuture = FutureUtil.createFutureWithTimeout(
+ Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(),
+ () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION);
if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load persistent topic {}", topic);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index bee9878..800182e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -21,15 +21,13 @@
import com.google.common.collect.ImmutableMap;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
@@ -62,6 +60,7 @@
private static final Logger log = LoggerFactory.getLogger(TwoPhaseCompactor.class);
private static final int MAX_OUTSTANDING = 500;
private static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger";
+ public static final Duration PHASE_ONE_LOOP_READ_TIMEOUT = Duration.ofSeconds(10);
public TwoPhaseCompactor(ServiceConfiguration conf,
PulsarClient pulsar,
@@ -116,7 +115,9 @@
return;
}
CompletableFuture<RawMessage> future = reader.readNextAsync();
- scheduleTimeout(future);
+ FutureUtil.addTimeoutHandling(future,
+ PHASE_ONE_LOOP_READ_TIMEOUT, scheduler,
+ () -> FutureUtil.createTimeoutException("Timeout", getClass(), "phaseOneLoop(...)"));
future.thenAcceptAsync(m -> {
try {
@@ -172,15 +173,6 @@
});
}
- private void scheduleTimeout(CompletableFuture<RawMessage> future) {
- Future<?> timeout = scheduler.schedule(() -> {
- future.completeExceptionally(new TimeoutException("Timeout"));
- }, 10, TimeUnit.SECONDS);
- future.whenComplete((res, exception) -> {
- timeout.cancel(true);
- });
- }
-
private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId to, MessageId lastReadId,
Map<String, MessageId> latestForKey, BookKeeper bk) {
Map<String, byte[]> metadata =
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index bdd5920..bf8492c 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -26,13 +26,13 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
+import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -50,6 +50,7 @@
import org.apache.pulsar.client.api.KeyStoreParams;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
import org.asynchttpclient.AsyncHttpClient;
@@ -71,10 +72,11 @@
*/
@Slf4j
public class AsyncHttpConnector implements Connector {
-
+ private static final TimeoutException READ_TIMEOUT_EXCEPTION =
+ FutureUtil.createTimeoutException("Read timeout", AsyncHttpConnector.class, "retryOrTimeout(...)");
@Getter
private final AsyncHttpClient httpClient;
- private final int readTimeout;
+ private final Duration readTimeout;
private final int maxRetries;
private final PulsarServiceNameResolver serviceNameResolver;
private final ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1,
@@ -156,7 +158,7 @@
}
}
httpClient = new DefaultAsyncHttpClient(confBuilder.build());
- this.readTimeout = readTimeoutMs;
+ this.readTimeout = Duration.ofMillis(readTimeoutMs);
this.maxRetries = httpClient.getConfig().getMaxRequestRetry();
}
@@ -216,7 +218,8 @@
private CompletableFuture<Response> retryOrTimeOut(ClientRequest request) {
final CompletableFuture<Response> resultFuture = new CompletableFuture<>();
retryOperation(resultFuture, () -> oneShot(serviceNameResolver.resolveHost(), request), maxRetries);
- CompletableFuture<Response> timeoutAfter = timeoutAfter(readTimeout, TimeUnit.MILLISECONDS);
+ CompletableFuture<Response> timeoutAfter = FutureUtil.createFutureWithTimeout(readTimeout, delayer,
+ () -> READ_TIMEOUT_EXCEPTION);
return resultFuture.applyToEither(timeoutAfter, Function.identity());
}
@@ -297,12 +300,6 @@
return builder.execute().toCompletableFuture();
}
- public <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
- CompletableFuture<T> result = new CompletableFuture<>();
- delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), timeout, unit);
- return result;
- }
-
@Override
public String getName() {
return "Pulsar-Admin";
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index b86ee10..53b6deb 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -18,9 +18,15 @@
*/
package org.apache.pulsar.common.util;
+import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
/**
* This class is aimed at simplifying work with {@code CompletableFuture}.
@@ -50,4 +56,71 @@
return t;
}
}
+
+ /**
+ * Creates a new {@link CompletableFuture} instance with timeout handling.
+ *
+ * @param timeout the duration of the timeout
+ * @param executor the executor to use for scheduling the timeout
+ * @param exceptionSupplier the supplier for creating the exception
+ * @param <T> type parameter for the future
+ * @return the new {@link CompletableFuture} instance
+ */
+ public static <T> CompletableFuture<T> createFutureWithTimeout(Duration timeout,
+ ScheduledExecutorService executor,
+ Supplier<Throwable> exceptionSupplier) {
+ return addTimeoutHandling(new CompletableFuture<>(), timeout, executor, exceptionSupplier);
+ }
+
+ /**
+ * Adds timeout handling to an existing {@link CompletableFuture}.
+ *
+ * @param future the target future
+ * @param timeout the duration of the timeout
+ * @param executor the executor to use for scheduling the timeout
+ * @param exceptionSupplier the supplier for creating the exception
+ * @param <T> type parameter for the future
+ * @return returns the original target future
+ */
+ public static <T> CompletableFuture<T> addTimeoutHandling(CompletableFuture<T> future, Duration timeout,
+ ScheduledExecutorService executor,
+ Supplier<Throwable> exceptionSupplier) {
+ ScheduledFuture<?> scheduledFuture = executor.schedule(() -> {
+ if (!future.isDone()) {
+ future.completeExceptionally(exceptionSupplier.get());
+ }
+ }, timeout.toMillis(), TimeUnit.MILLISECONDS);
+ future.whenComplete((res, exception) -> scheduledFuture.cancel(false));
+ return future;
+ }
+
+ /**
+ * Creates a low-overhead timeout exception which is performance optimized to minimize allocations
+ * and cpu consumption. It sets the stacktrace of the exception to the given source class and
+ * source method name. The instances of this class can be cached or stored as constants and reused
+ * multiple times.
+ *
+ * @param message exception message
+ * @param sourceClass source class for manually filled in stacktrace
+ * @param sourceMethod source method name for manually filled in stacktrace
+ * @return new TimeoutException instance
+ */
+ public static TimeoutException createTimeoutException(String message, Class<?> sourceClass, String sourceMethod) {
+ return new LowOverheadTimeoutException(message, sourceClass, sourceMethod);
+ }
+
+ private static class LowOverheadTimeoutException extends TimeoutException {
+ private static final long serialVersionUID = 1L;
+
+ LowOverheadTimeoutException(String message, Class<?> sourceClass, String sourceMethod) {
+ super(message);
+ setStackTrace(new StackTraceElement[]{new StackTraceElement(sourceClass.getName(), sourceMethod,
+ null, -1)});
+ }
+
+ @Override
+ public synchronized Throwable fillInStackTrace() {
+ return this;
+ }
+ }
}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java
new file mode 100644
index 0000000..8378aa5
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.util;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
+import org.testng.annotations.Test;
+
+public class FutureUtilTest {
+
+ @Test
+ public void testCreateTimeoutException() {
+ TimeoutException timeoutException = FutureUtil.createTimeoutException("hello world", getClass(), "test(...)");
+ assertNotNull(timeoutException);
+ assertEquals(timeoutException.getMessage(), "hello world");
+ StringWriter stringWriter = new StringWriter();
+ timeoutException.printStackTrace(new PrintWriter(stringWriter, true));
+ assertEquals(stringWriter.toString(),
+ "org.apache.pulsar.common.util.FutureUtil$LowOverheadTimeoutException: "
+ + "hello world\n"
+ + "\tat org.apache.pulsar.common.util.FutureUtilTest.test(...)(Unknown Source)\n");
+ }
+
+ @Test
+ public void testTimeoutHandling() {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+ Exception e = new Exception();
+ try {
+ FutureUtil.addTimeoutHandling(future, Duration.ofMillis(1), executor, () -> e);
+ future.get();
+ fail("Should have failed.");
+ } catch (InterruptedException interruptedException) {
+ fail("Shouldn't occur");
+ } catch (ExecutionException executionException) {
+ assertEquals(executionException.getCause(), e);
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testTimeoutHandlingNoTimeout() throws ExecutionException, InterruptedException {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+ try {
+ FutureUtil.addTimeoutHandling(future, Duration.ofMillis(100), executor, () -> new Exception());
+ future.complete(null);
+ future.get();
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+ @Test
+ public void testCreatingFutureWithTimeoutHandling() {
+ ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+ Exception e = new Exception();
+ try {
+ CompletableFuture<Void> future = FutureUtil.createFutureWithTimeout(Duration.ofMillis(1), executor,
+ () -> e);
+ future.get();
+ fail("Should have failed.");
+ } catch (InterruptedException interruptedException) {
+ fail("Shouldn't occur");
+ } catch (ExecutionException executionException) {
+ assertEquals(executionException.getCause(), e);
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+}
\ No newline at end of file