ZOOKEEPER-4289: Reduce the performance impact of Prometheus metrics

Enabling Prometheus provider has significant impact on the performance of both read and write operations. This is to reduce the impact by making the Prometheus summary reporting as an async operation.

Load test results showed that the avg read latency and throughput after the fix is on par with the performance when Prometheus is disabled. For writes, the avg latency was reduced 25% and the avg throughput was increased 20% after the fix.

Author: liwang <liwang@apple.com>

Reviewers: Enrico Olivelli <eolivelli@apache.org>, Andor Molnar <anmolnar@apache.org>

Closes #1698 from li4wang/ZOOKEEPER-4289
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index 3ec44a2..a8650f1 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -2129,7 +2129,23 @@
 * *metricsProvider.exportJvmInfo* :
     If this property is set to **true** Prometheus.io will export useful metrics about the JVM.
     The default is true.
-
+    
+* *metricsProvider.numWorkerThreads* :
+   **New in 3.7.1:**
+   Number of worker threads for reporting Prometheus summary metrics. 
+   Default value is 1. 
+   If the number is less than 1, the main thread will be used.
+    
+* *metricsProvider.maxQueueSize* :
+   **New in 3.7.1:**
+   The max queue size for Prometheus summary metrics reporting task.
+   Default value is 1000000.
+   
+* *metricsProvider.workerShutdownTimeoutMs* :
+   **New in 3.7.1:**
+   The timeout in ms for Prometheus worker threads shutdown.
+   Default value is 1000ms.
+   
 <a name="Communication+using+the+Netty+framework"></a>
 
 ### Communication using the Netty framework
diff --git a/zookeeper-metrics-providers/zookeeper-prometheus-metrics/src/main/java/org/apache/zookeeper/metrics/prometheus/PrometheusMetricsProvider.java b/zookeeper-metrics-providers/zookeeper-prometheus-metrics/src/main/java/org/apache/zookeeper/metrics/prometheus/PrometheusMetricsProvider.java
index a3d5d7a..bcafdd9 100644
--- a/zookeeper-metrics-providers/zookeeper-prometheus-metrics/src/main/java/org/apache/zookeeper/metrics/prometheus/PrometheusMetricsProvider.java
+++ b/zookeeper-metrics-providers/zookeeper-prometheus-metrics/src/main/java/org/apache/zookeeper/metrics/prometheus/PrometheusMetricsProvider.java
@@ -26,9 +26,18 @@
 import java.net.InetSocketAddress;
 import java.util.Enumeration;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
@@ -40,6 +49,7 @@
 import org.apache.zookeeper.metrics.MetricsProviderLifeCycleException;
 import org.apache.zookeeper.metrics.Summary;
 import org.apache.zookeeper.metrics.SummarySet;
+import org.apache.zookeeper.server.RateLogger;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
@@ -56,6 +66,26 @@
     private static final Logger LOG = LoggerFactory.getLogger(PrometheusMetricsProvider.class);
     private static final String LABEL = "key";
     private static final String[] LABELS = {LABEL};
+
+    /**
+     * Number of worker threads for reporting Prometheus summary metrics.
+     * Default value is 1.
+     * If the number is less than 1, the main thread will be used.
+     */
+    static final String NUM_WORKER_THREADS = "numWorkerThreads";
+
+    /**
+     * The max queue size for Prometheus summary metrics reporting task.
+     * Default value is 1000000.
+     */
+    static final String MAX_QUEUE_SIZE = "maxQueueSize";
+
+    /**
+     * The timeout in ms for Prometheus worker threads shutdown.
+     * Default value is 1000ms.
+     */
+    static final String WORKER_SHUTDOWN_TIMEOUT_MS = "workerShutdownTimeoutMs";
+
     /**
      * We are using the 'defaultRegistry'.
      * <p>
@@ -64,12 +94,17 @@
      * </p>
      */
     private final CollectorRegistry collectorRegistry = CollectorRegistry.defaultRegistry;
+    private final RateLogger rateLogger = new RateLogger(LOG, 60 * 1000);
     private String host = "0.0.0.0";
     private int port = 7000;
     private boolean exportJvmInfo = true;
     private Server server;
     private final MetricsServletImpl servlet = new MetricsServletImpl();
     private final Context rootContext = new Context();
+    private int numWorkerThreads = 1;
+    private int maxQueueSize = 1000000;
+    private long workerShutdownTimeoutMs = 1000;
+    private Optional<ExecutorService> executorOptional = Optional.empty();
 
     @Override
     public void configure(Properties configuration) throws MetricsProviderLifeCycleException {
@@ -77,10 +112,17 @@
         this.host = configuration.getProperty("httpHost", "0.0.0.0");
         this.port = Integer.parseInt(configuration.getProperty("httpPort", "7000"));
         this.exportJvmInfo = Boolean.parseBoolean(configuration.getProperty("exportJvmInfo", "true"));
+        this.numWorkerThreads = Integer.parseInt(
+                configuration.getProperty(NUM_WORKER_THREADS, "1"));
+        this.maxQueueSize = Integer.parseInt(
+                configuration.getProperty(MAX_QUEUE_SIZE, "1000000"));
+        this.workerShutdownTimeoutMs = Long.parseLong(
+                configuration.getProperty(WORKER_SHUTDOWN_TIMEOUT_MS, "1000"));
     }
 
     @Override
     public void start() throws MetricsProviderLifeCycleException {
+        this.executorOptional = createExecutor();
         try {
             LOG.info("Starting /metrics HTTP endpoint at host: {}, port: {}, exportJvmInfo: {}",
                     host, port, exportJvmInfo);
@@ -120,6 +162,7 @@
 
     @Override
     public void stop() {
+        shutdownExecutor();
         if (server != null) {
             try {
                 server.stop();
@@ -331,7 +374,7 @@
 
     }
 
-    private class PrometheusSummary implements Summary {
+    class PrometheusSummary implements Summary {
 
         private final io.prometheus.client.Summary inner;
         private final String name;
@@ -355,16 +398,19 @@
 
         @Override
         public void add(long delta) {
+            reportMetrics(() -> observe(delta));
+        }
+
+        void observe(final long delta) {
             try {
                 inner.observe(delta);
-            } catch (IllegalArgumentException err) {
+            } catch (final IllegalArgumentException err) {
                 LOG.error("invalid delta {} for metric {}", delta, name, err);
             }
         }
-
     }
 
-    private class PrometheusLabelledSummary implements SummarySet {
+    class PrometheusLabelledSummary implements SummarySet {
 
         private final io.prometheus.client.Summary inner;
         private final String name;
@@ -390,9 +436,13 @@
 
         @Override
         public void add(String key, long value) {
+            reportMetrics(() -> observe(key, value));
+        }
+
+        void observe(final String key, final long value) {
             try {
                 inner.labels(key).observe(value);
-            } catch (IllegalArgumentException err) {
+            } catch (final IllegalArgumentException err) {
                 LOG.error("invalid value {} for metric {} with key {}", value, name, key, err);
             }
         }
@@ -410,4 +460,65 @@
             super.doGet(req, resp);
         }
     }
+
+    private Optional<ExecutorService> createExecutor() {
+        if (numWorkerThreads < 1) {
+            LOG.info("Executor service was not created as numWorkerThreads {} is less than 1", numWorkerThreads);
+            return Optional.empty();
+        }
+
+        final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(maxQueueSize);
+        final ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads,
+                numWorkerThreads,
+                0L,
+                TimeUnit.MILLISECONDS,
+                queue, new PrometheusWorkerThreadFactory());
+        LOG.info("Executor service was created with numWorkerThreads {} and maxQueueSize {}",
+                numWorkerThreads,
+                maxQueueSize);
+        return Optional.of(executor);
+    }
+
+    private void shutdownExecutor() {
+        if (executorOptional.isPresent()) {
+            LOG.info("Shutdown executor service with timeout {}", workerShutdownTimeoutMs);
+            final ExecutorService executor = executorOptional.get();
+            executor.shutdown();
+            try {
+                if (!executor.awaitTermination(workerShutdownTimeoutMs, TimeUnit.MILLISECONDS)) {
+                    LOG.error("Not all the Prometheus worker threads terminated properly after {} timeout",
+                            workerShutdownTimeoutMs);
+                    executor.shutdownNow();
+                }
+            } catch (final Exception e) {
+                LOG.error("Error occurred while terminating Prometheus worker threads", e);
+                executor.shutdownNow();
+            }
+        }
+    }
+
+    private static class PrometheusWorkerThreadFactory implements ThreadFactory {
+        private static final AtomicInteger workerCounter = new AtomicInteger(1);
+
+        @Override
+        public Thread newThread(final Runnable runnable) {
+            final String threadName = "PrometheusMetricsProviderWorker-" + workerCounter.getAndIncrement();
+            final Thread thread = new Thread(runnable, threadName);
+            thread.setDaemon(true);
+            return thread;
+        }
+    }
+
+    private void reportMetrics(final Runnable task) {
+        if (executorOptional.isPresent()) {
+            try {
+                executorOptional.get().submit(task);
+            } catch (final RejectedExecutionException e) {
+                rateLogger.rateLimitLog("Prometheus metrics reporting task queue size exceeded the max",
+                        String.valueOf(maxQueueSize));
+            }
+        } else {
+            task.run();
+        }
+    }
 }
diff --git a/zookeeper-metrics-providers/zookeeper-prometheus-metrics/src/test/java/org/apache/zookeeper/metrics/prometheus/PrometheusMetricsProviderTest.java b/zookeeper-metrics-providers/zookeeper-prometheus-metrics/src/test/java/org/apache/zookeeper/metrics/prometheus/PrometheusMetricsProviderTest.java
index 8331b05..94f1592 100644
--- a/zookeeper-metrics-providers/zookeeper-prometheus-metrics/src/test/java/org/apache/zookeeper/metrics/prometheus/PrometheusMetricsProviderTest.java
+++ b/zookeeper-metrics-providers/zookeeper-prometheus-metrics/src/test/java/org/apache/zookeeper/metrics/prometheus/PrometheusMetricsProviderTest.java
@@ -30,6 +30,8 @@
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
@@ -178,10 +180,11 @@
 
     @Test
     public void testBasicSummary() throws Exception {
-        Summary summary = provider.getRootContext()
+        final PrometheusMetricsProvider.PrometheusSummary summary =
+                (PrometheusMetricsProvider.PrometheusSummary) provider.getRootContext()
                 .getSummary("cc", MetricsContext.DetailLevel.BASIC);
-        summary.add(10);
-        summary.add(10);
+        summary.observe(10);
+        summary.observe(10);
         int[] count = {0};
         provider.dump((k, v) -> {
             count[0]++;
@@ -227,10 +230,11 @@
 
     @Test
     public void testAdvancedSummary() throws Exception {
-        Summary summary = provider.getRootContext()
+        final PrometheusMetricsProvider.PrometheusSummary summary =
+                (PrometheusMetricsProvider.PrometheusSummary) provider.getRootContext()
                 .getSummary("cc", MetricsContext.DetailLevel.ADVANCED);
-        summary.add(10);
-        summary.add(10);
+        summary.observe(10);
+        summary.observe(10);
         int[] count = {0};
         provider.dump((k, v) -> {
             count[0]++;
@@ -282,6 +286,61 @@
         assertThat(res, CoreMatchers.containsString("cc{quantile=\"0.99\",} 10.0"));
     }
 
+    @Test
+    public void testSummary_sync() throws Exception {
+        final Properties config = new Properties();
+        config.setProperty("numWorkerThreads", "0");
+        config.setProperty("httpPort", "0"); // ephemeral port
+        config.setProperty("exportJvmInfo", "false");
+
+        PrometheusMetricsProvider metricsProvider = null;
+        try {
+            metricsProvider = new PrometheusMetricsProvider();
+            metricsProvider.configure(config);
+            metricsProvider.start();
+
+            final Summary summary =
+                    metricsProvider.getRootContext().getSummary("cc", MetricsContext.DetailLevel.BASIC);
+            summary.add(10);
+            summary.add(20);
+
+            final Map<String, Object> res = new HashMap<>();
+            metricsProvider.dump(res::put);
+            assertEquals(3, res.keySet().stream().filter(key -> key.startsWith("cc")).count());
+        } finally {
+            if (metricsProvider != null) {
+                metricsProvider.stop();
+            }
+        }
+    }
+
+    @Test
+    public void testSummary_asyncAndExceedMaxQueueSize() throws Exception {
+        final Properties config = new Properties();
+        config.setProperty("numWorkerThreads", "1");
+        config.setProperty("maxQueueSize", "1");
+        config.setProperty("httpPort", "0"); // ephemeral port
+        config.setProperty("exportJvmInfo", "false");
+
+        PrometheusMetricsProvider metricsProvider = null;
+        try {
+            metricsProvider = new PrometheusMetricsProvider();
+            metricsProvider.configure(config);
+            metricsProvider.start();
+            final Summary summary =
+                    metricsProvider.getRootContext().getSummary("cc", MetricsContext.DetailLevel.ADVANCED);
+
+            // make sure no error is thrown
+            for (int i = 0; i < 10; i++) {
+                summary.add(10);
+            }
+        } finally {
+            if (metricsProvider != null) {
+               metricsProvider.stop();
+            }
+        }
+    }
+
     private String callServlet() throws ServletException, IOException {
         // we are not performing an HTTP request
         // but we are calling directly the servlet