[Performance] Optimize CompletableFuture timeout handling (#10065)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 5a41037..abac85b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -23,15 +23,13 @@
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
+import java.time.Duration;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
@@ -60,6 +58,7 @@
 import org.apache.pulsar.common.conf.InternalConfigurationData;
 import org.apache.pulsar.common.policies.data.BrokerInfo;
 import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,6 +67,7 @@
  */
 public class BrokersBase extends PulsarWebResource {
     private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class);
+    private static final Duration HEALTHCHECK_READ_TIMEOUT = Duration.ofSeconds(10);
 
     @GET
     @Path("/{cluster}")
@@ -346,13 +346,10 @@
                         healthcheckReadLoop(readerFuture, completePromise, messageStr);
 
                         // timeout read loop after 10 seconds
-                        ScheduledFuture<?> timeout = pulsar().getExecutor().schedule(() -> {
-                                completePromise.completeExceptionally(new TimeoutException("Timed out reading"));
-                            }, 10, TimeUnit.SECONDS);
-                        // don't leave timeout dangling
-                        completePromise.whenComplete((ignore2, exception2) -> {
-                                timeout.cancel(false);
-                            });
+                        FutureUtil.addTimeoutHandling(completePromise,
+                                HEALTHCHECK_READ_TIMEOUT, pulsar().getExecutor(),
+                                () -> FutureUtil.createTimeoutException("Timed out reading", getClass(),
+                                        "healthcheck(...)"));
                     }
                 });
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 2af76e9..1de8d9f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -45,6 +45,7 @@
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -179,6 +180,13 @@
 @Setter(AccessLevel.PROTECTED)
 public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies> {
     private static final Logger log = LoggerFactory.getLogger(BrokerService.class);
+    private static final Duration FUTURE_DEADLINE_TIMEOUT_DURATION = Duration.ofSeconds(60);
+    private static final TimeoutException FUTURE_DEADLINE_TIMEOUT_EXCEPTION =
+            FutureUtil.createTimeoutException("Future didn't finish within deadline", BrokerService.class,
+                    "futureWithDeadline(...)");
+    private static final TimeoutException FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION =
+            FutureUtil.createTimeoutException("Failed to load topic within timeout", BrokerService.class,
+                    "futureWithDeadline(...)");
 
     private final PulsarService pulsar;
     private final ManagedLedgerFactory managedLedgerFactory;
@@ -847,7 +855,7 @@
             }
         } catch (IllegalArgumentException e) {
             log.warn("[{}] Illegalargument exception when loading topic", topic, e);
-            return failedFuture(e);
+            return FutureUtil.failedFuture(e);
         } catch (RuntimeException e) {
             Throwable cause = e.getCause();
             if (cause instanceof ServiceUnitNotReadyException) {
@@ -856,7 +864,7 @@
                 log.warn("[{}] Unexpected exception when loading topic: {}", topic, e.getMessage(), e);
             }
 
-            return failedFuture(cause);
+            return FutureUtil.failedFuture(cause);
         }
     }
 
@@ -964,25 +972,9 @@
         return topicFuture;
     }
 
-    private static <T> CompletableFuture<T> failedFuture(Throwable t) {
-        CompletableFuture<T> future = new CompletableFuture<>();
-        future.completeExceptionally(t);
-        return future;
-    }
-
-    private <T> CompletableFuture<T> futureWithDeadline(Long delay, TimeUnit unit, Exception exp) {
-        CompletableFuture<T> future = new CompletableFuture<T>();
-        executor().schedule(() -> {
-            if (!future.isDone()) {
-                future.completeExceptionally(exp);
-            }
-        }, delay, unit);
-        return future;
-    }
-
     private <T> CompletableFuture<T> futureWithDeadline() {
-        return futureWithDeadline(60000L, TimeUnit.MILLISECONDS,
-                new TimeoutException("Future didn't finish within deadline"));
+        return FutureUtil.createFutureWithTimeout(FUTURE_DEADLINE_TIMEOUT_DURATION, executor(),
+                () -> FUTURE_DEADLINE_TIMEOUT_EXCEPTION);
     }
 
     public PulsarClient getReplicationClient(String cluster) {
@@ -1093,9 +1085,9 @@
      */
     protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final String topic,
             boolean createIfMissing) throws RuntimeException {
-        final CompletableFuture<Optional<Topic>> topicFuture = futureWithDeadline(
-                pulsar.getConfiguration().getTopicLoadTimeoutSeconds(),
-                TimeUnit.SECONDS, new TimeoutException("Failed to load topic within timeout"));
+        final CompletableFuture<Optional<Topic>> topicFuture = FutureUtil.createFutureWithTimeout(
+                Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(),
+                () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION);
         if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
             if (log.isDebugEnabled()) {
                 log.debug("Broker is unable to load persistent topic {}", topic);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index bee9878..800182e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -21,15 +21,13 @@
 import com.google.common.collect.ImmutableMap;
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -62,6 +60,7 @@
     private static final Logger log = LoggerFactory.getLogger(TwoPhaseCompactor.class);
     private static final int MAX_OUTSTANDING = 500;
     private static final String COMPACTED_TOPIC_LEDGER_PROPERTY = "CompactedTopicLedger";
+    public static final Duration PHASE_ONE_LOOP_READ_TIMEOUT = Duration.ofSeconds(10);
 
     public TwoPhaseCompactor(ServiceConfiguration conf,
                              PulsarClient pulsar,
@@ -116,7 +115,9 @@
             return;
         }
         CompletableFuture<RawMessage> future = reader.readNextAsync();
-        scheduleTimeout(future);
+        FutureUtil.addTimeoutHandling(future,
+                PHASE_ONE_LOOP_READ_TIMEOUT, scheduler,
+                () -> FutureUtil.createTimeoutException("Timeout", getClass(), "phaseOneLoop(...)"));
 
         future.thenAcceptAsync(m -> {
             try {
@@ -172,15 +173,6 @@
         });
     }
 
-    private void scheduleTimeout(CompletableFuture<RawMessage> future) {
-        Future<?> timeout = scheduler.schedule(() -> {
-            future.completeExceptionally(new TimeoutException("Timeout"));
-        }, 10, TimeUnit.SECONDS);
-        future.whenComplete((res, exception) -> {
-            timeout.cancel(true);
-        });
-    }
-
     private CompletableFuture<Long> phaseTwo(RawReader reader, MessageId from, MessageId to, MessageId lastReadId,
             Map<String, MessageId> latestForKey, BookKeeper bk) {
         Map<String, byte[]> metadata =
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
index bdd5920..bf8492c 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java
@@ -26,13 +26,13 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.time.Duration;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 import java.util.function.Supplier;
@@ -50,6 +50,7 @@
 import org.apache.pulsar.client.api.KeyStoreParams;
 import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.SecurityUtility;
 import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
 import org.asynchttpclient.AsyncHttpClient;
@@ -71,10 +72,11 @@
  */
 @Slf4j
 public class AsyncHttpConnector implements Connector {
-
+    private static final TimeoutException READ_TIMEOUT_EXCEPTION =
+            FutureUtil.createTimeoutException("Read timeout", AsyncHttpConnector.class, "retryOrTimeout(...)");
     @Getter
     private final AsyncHttpClient httpClient;
-    private final int readTimeout;
+    private final Duration readTimeout;
     private final int maxRetries;
     private final PulsarServiceNameResolver serviceNameResolver;
     private final ScheduledExecutorService delayer = Executors.newScheduledThreadPool(1,
@@ -156,7 +158,7 @@
             }
         }
         httpClient = new DefaultAsyncHttpClient(confBuilder.build());
-        this.readTimeout = readTimeoutMs;
+        this.readTimeout = Duration.ofMillis(readTimeoutMs);
         this.maxRetries = httpClient.getConfig().getMaxRequestRetry();
     }
 
@@ -216,7 +218,8 @@
     private CompletableFuture<Response> retryOrTimeOut(ClientRequest request) {
         final CompletableFuture<Response> resultFuture = new CompletableFuture<>();
         retryOperation(resultFuture, () -> oneShot(serviceNameResolver.resolveHost(), request), maxRetries);
-        CompletableFuture<Response> timeoutAfter = timeoutAfter(readTimeout, TimeUnit.MILLISECONDS);
+        CompletableFuture<Response> timeoutAfter = FutureUtil.createFutureWithTimeout(readTimeout, delayer,
+                () -> READ_TIMEOUT_EXCEPTION);
         return resultFuture.applyToEither(timeoutAfter, Function.identity());
     }
 
@@ -297,12 +300,6 @@
         return builder.execute().toCompletableFuture();
     }
 
-    public <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
-        CompletableFuture<T> result = new CompletableFuture<>();
-        delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), timeout, unit);
-        return result;
-    }
-
     @Override
     public String getName() {
         return "Pulsar-Admin";
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index b86ee10..53b6deb 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -18,9 +18,15 @@
  */
 package org.apache.pulsar.common.util;
 
+import java.time.Duration;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
 
 /**
  * This class is aimed at simplifying work with {@code CompletableFuture}.
@@ -50,4 +56,71 @@
             return t;
         }
     }
+
+    /**
+     * Creates a new {@link CompletableFuture} instance with timeout handling.
+     *
+     * @param timeout the duration of the timeout
+     * @param executor the executor to use for scheduling the timeout
+     * @param exceptionSupplier the supplier for creating the exception
+     * @param <T> type parameter for the future
+     * @return the new {@link CompletableFuture} instance
+     */
+    public static <T> CompletableFuture<T> createFutureWithTimeout(Duration timeout,
+                                                                   ScheduledExecutorService executor,
+                                                                   Supplier<Throwable> exceptionSupplier) {
+        return addTimeoutHandling(new CompletableFuture<>(), timeout, executor, exceptionSupplier);
+    }
+
+    /**
+     * Adds timeout handling to an existing {@link CompletableFuture}.
+     *
+     * @param future the target future
+     * @param timeout the duration of the timeout
+     * @param executor the executor to use for scheduling the timeout
+     * @param exceptionSupplier the supplier for creating the exception
+     * @param <T> type parameter for the future
+     * @return returns the original target future
+     */
+    public static <T> CompletableFuture<T> addTimeoutHandling(CompletableFuture<T> future, Duration timeout,
+                                               ScheduledExecutorService executor,
+                                               Supplier<Throwable> exceptionSupplier) {
+        ScheduledFuture<?> scheduledFuture = executor.schedule(() -> {
+            if (!future.isDone()) {
+                future.completeExceptionally(exceptionSupplier.get());
+            }
+        }, timeout.toMillis(), TimeUnit.MILLISECONDS);
+        future.whenComplete((res, exception) -> scheduledFuture.cancel(false));
+        return future;
+    }
+
+    /**
+     * Creates a low-overhead timeout exception which is performance optimized to minimize allocations
+     * and cpu consumption. It sets the stacktrace of the exception to the given source class and
+     * source method name. The instances of this class can be cached or stored as constants and reused
+     * multiple times.
+     *
+     * @param message exception message
+     * @param sourceClass source class for manually filled in stacktrace
+     * @param sourceMethod source method name for manually filled in stacktrace
+     * @return new TimeoutException instance
+     */
+    public static TimeoutException createTimeoutException(String message, Class<?> sourceClass, String sourceMethod) {
+        return new LowOverheadTimeoutException(message, sourceClass, sourceMethod);
+    }
+
+    private static class LowOverheadTimeoutException extends TimeoutException {
+        private static final long serialVersionUID = 1L;
+
+        LowOverheadTimeoutException(String message, Class<?> sourceClass, String sourceMethod) {
+            super(message);
+            setStackTrace(new StackTraceElement[]{new StackTraceElement(sourceClass.getName(), sourceMethod,
+                    null, -1)});
+        }
+
+        @Override
+        public synchronized Throwable fillInStackTrace() {
+            return this;
+        }
+    }
 }
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java
new file mode 100644
index 0000000..8378aa5
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.common.util;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.fail;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeoutException;
+import org.testng.annotations.Test;
+
+public class FutureUtilTest {
+
+    @Test
+    public void testCreateTimeoutException() {
+        TimeoutException timeoutException = FutureUtil.createTimeoutException("hello world", getClass(), "test(...)");
+        assertNotNull(timeoutException);
+        assertEquals(timeoutException.getMessage(), "hello world");
+        StringWriter stringWriter = new StringWriter();
+        timeoutException.printStackTrace(new PrintWriter(stringWriter, true));
+        assertEquals(stringWriter.toString(),
+                "org.apache.pulsar.common.util.FutureUtil$LowOverheadTimeoutException: "
+                + "hello world\n"
+                + "\tat org.apache.pulsar.common.util.FutureUtilTest.test(...)(Unknown Source)\n");
+    }
+
+    @Test
+    public void testTimeoutHandling() {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+        Exception e = new Exception();
+        try {
+            FutureUtil.addTimeoutHandling(future, Duration.ofMillis(1), executor, () -> e);
+            future.get();
+            fail("Should have failed.");
+        } catch (InterruptedException interruptedException) {
+            fail("Shouldn't occur");
+        } catch (ExecutionException executionException) {
+            assertEquals(executionException.getCause(), e);
+        } finally {
+            executor.shutdownNow();
+        }
+    }
+
+    @Test
+    public void testTimeoutHandlingNoTimeout() throws ExecutionException, InterruptedException {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+        try {
+            FutureUtil.addTimeoutHandling(future, Duration.ofMillis(100), executor, () -> new Exception());
+            future.complete(null);
+            future.get();
+        } finally {
+            executor.shutdownNow();
+        }
+    }
+
+    @Test
+    public void testCreatingFutureWithTimeoutHandling() {
+        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+        Exception e = new Exception();
+        try {
+            CompletableFuture<Void> future = FutureUtil.createFutureWithTimeout(Duration.ofMillis(1), executor,
+                    () -> e);
+            future.get();
+            fail("Should have failed.");
+        } catch (InterruptedException interruptedException) {
+            fail("Shouldn't occur");
+        } catch (ExecutionException executionException) {
+            assertEquals(executionException.getCause(), e);
+        } finally {
+            executor.shutdownNow();
+        }
+    }
+}
\ No newline at end of file