Exposing prometheus metrics for Pulsar function local run mode (#10156)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index fca0298..4975280 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -23,6 +23,7 @@
import static org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@@ -81,6 +82,7 @@
import org.apache.pulsar.functions.LocalRunner;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -411,6 +413,7 @@
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
functionConfig.setJar(jarFilePathUrl);
+ int metricsPort = FunctionCommon.findAvailablePort();
@Cleanup
LocalRunner localRunner = LocalRunner.builder()
.functionConfig(functionConfig)
@@ -420,6 +423,7 @@
.tlsTrustCertFilePath(TLS_TRUST_CERT_FILE_PATH)
.tlsAllowInsecureConnection(true)
.tlsHostNameVerificationEnabled(false)
+ .metricsPortStart(metricsPort)
.brokerServiceUrl(pulsar.getBrokerServiceUrlTls()).build();
localRunner.start(false);
@@ -463,6 +467,21 @@
assertNotEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
totalMsgs);
+ // validate prometheus metrics
+ String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(metricsPort);
+ log.info("prometheus metrics: {}", prometheusMetrics);
+
+ Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
+ assertFalse(metrics.isEmpty());
+
+ PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_function_processed_successfully_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("name"), functionName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+ assertEquals(m.value, 5.0);
+
// stop functions
localRunner.stop();
@@ -771,6 +790,7 @@
}
sinkConfig.setArchive(jarFilePathUrl);
+ int metricsPort = FunctionCommon.findAvailablePort();
@Cleanup
LocalRunner localRunner = LocalRunner.builder()
.sinkConfig(sinkConfig)
@@ -782,6 +802,7 @@
.tlsHostNameVerificationEnabled(false)
.brokerServiceUrl(pulsar.getBrokerServiceUrlTls())
.connectorsDirectory(workerConfig.getConnectorsDirectory())
+ .metricsPortStart(metricsPort)
.build();
localRunner.start(false);
@@ -819,6 +840,21 @@
}
}, 5, 200);
+ // validate prometheus metrics
+ String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(metricsPort);
+ log.info("prometheus metrics: {}", prometheusMetrics);
+
+ Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
+ assertFalse(metrics.isEmpty());
+
+ PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_written_total");
+ assertEquals(m.tags.get("cluster"), config.getClusterName());
+ assertEquals(m.tags.get("instance_id"), "0");
+ assertEquals(m.tags.get("name"), sinkName);
+ assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+ assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
+ assertEquals(m.value, 10.0);
+
// stop sink
localRunner.stop();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java
new file mode 100644
index 0000000..769cfe9
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+@Slf4j
+public class PulsarFunctionTestUtils {
+ public static String getPrometheusMetrics(int metricsPort) throws IOException {
+ StringBuilder result = new StringBuilder();
+ URL url = new URL(String.format("http://%s:%s/metrics", "localhost", metricsPort));
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("GET");
+ BufferedReader rd = new BufferedReader(new InputStreamReader(conn.getInputStream()));
+ String line;
+ while ((line = rd.readLine()) != null) {
+ result.append(line + System.lineSeparator());
+ }
+ rd.close();
+ return result.toString();
+ }
+
+ /**
+ * Hacky parsing of Prometheus text format. Sould be good enough for unit tests
+ */
+ public static Map<String, Metric> parseMetrics(String metrics) {
+ final Map<String, Metric> parsed = new HashMap<>();
+ // Example of lines are
+ // jvm_threads_current{cluster="standalone",} 203.0
+ // or
+ // pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1",
+ // topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
+ Pattern pattern = Pattern.compile("^(\\w+)(\\{[^\\}]+\\})?\\s(-?[\\d\\w\\.-]+)(\\s(\\d+))?$");
+ Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
+ Arrays.asList(metrics.split("\n")).forEach(line -> {
+ if (line.isEmpty() || line.startsWith("#")) {
+ return;
+ }
+ Matcher matcher = pattern.matcher(line);
+ log.info("line: {}", line);
+ checkArgument(matcher.matches());
+ String name = matcher.group(1);
+ Metric m = new Metric();
+ String numericValue = matcher.group(3);
+ if (numericValue.equalsIgnoreCase("-Inf")) {
+ m.value = Double.NEGATIVE_INFINITY;
+ } else if (numericValue.equalsIgnoreCase("+Inf")) {
+ m.value = Double.POSITIVE_INFINITY;
+ } else {
+ m.value = Double.parseDouble(numericValue);
+ }
+ String tags = matcher.group(2);
+ if (tags != null) {
+ tags = tags.replace("{", "").replace("}", "");
+ Matcher tagsMatcher = tagsPattern.matcher(tags);
+ while (tagsMatcher.find()) {
+ String tag = tagsMatcher.group(1);
+ String value = tagsMatcher.group(2);
+ m.tags.put(tag, value);
+ }
+ }
+ parsed.put(name, m);
+ });
+
+ log.info("parsed metrics: {}", parsed);
+ return parsed;
+ }
+
+ @ToString
+ public static class Metric {
+ public final Map<String, String> tags = new TreeMap<>();
+ public double value;
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
index ef9f65c..53ca40a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
@@ -297,64 +297,4 @@
workerService.init(workerConfig, null, false);
return workerService;
}
-
- protected static String getPrometheusMetrics(int metricsPort) throws IOException {
- StringBuilder result = new StringBuilder();
- URL url = new URL(String.format("http://%s:%s/metrics", "localhost", metricsPort));
- HttpURLConnection conn = (HttpURLConnection) url.openConnection();
- conn.setRequestMethod("GET");
- BufferedReader rd = new BufferedReader(new InputStreamReader(conn.getInputStream()));
- String line;
- while ((line = rd.readLine()) != null) {
- result.append(line + System.lineSeparator());
- }
- rd.close();
- return result.toString();
- }
-
- /**
- * Hacky parsing of Prometheus text format. Sould be good enough for unit tests
- */
- protected static Map<String, Metric> parseMetrics(String metrics) {
- final Map<String, Metric> parsed = new HashMap<>();
- // Example of lines are
- // jvm_threads_current{cluster="standalone",} 203.0
- // or
- // pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1",
- // topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
- Pattern pattern = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s(-?[\\d\\w\\.-]+)(\\s(\\d+))?$");
- Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
- Arrays.asList(metrics.split("\n")).forEach(line -> {
- if (line.isEmpty() || line.startsWith("#")) {
- return;
- }
- Matcher matcher = pattern.matcher(line);
- checkArgument(matcher.matches());
- String name = matcher.group(1);
- Metric m = new Metric();
- String numericValue = matcher.group(3);
- if (numericValue.equalsIgnoreCase("-Inf")) {
- m.value = Double.NEGATIVE_INFINITY;
- } else if (numericValue.equalsIgnoreCase("+Inf")) {
- m.value = Double.POSITIVE_INFINITY;
- } else {
- m.value = Double.parseDouble(numericValue);
- }
- String tags = matcher.group(2);
- Matcher tagsMatcher = tagsPattern.matcher(tags);
- while (tagsMatcher.find()) {
- String tag = tagsMatcher.group(1);
- String value = tagsMatcher.group(2);
- m.tags.put(tag, value);
- }
- parsed.put(name, m);
- });
- return parsed;
- }
-
- @ToString
- static class Metric {
- final Map<String, String> tags = new TreeMap<>();
- double value;
- }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarBatchSourceE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarBatchSourceE2ETest.java
index b2cd9ee..026d988 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarBatchSourceE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarBatchSourceE2ETest.java
@@ -35,6 +35,7 @@
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils;
import org.apache.pulsar.io.batchdiscovery.ImmediateTriggerer;
import org.testng.annotations.Test;
@@ -101,11 +102,11 @@
}, 50, 150);
assertEquals(admin.topics().getStats(sinkTopic2).publishers.size(), 1);
- String prometheusMetrics = getPrometheusMetrics(pulsar.getListenPortHTTP().get());
+ String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get());
log.info("prometheusMetrics: {}", prometheusMetrics);
- Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
- Metric m = metrics.get("pulsar_source_received_total");
+ Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
+ PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_source_received_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), sourceName);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 4a24415..e56d4f4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -32,6 +32,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -48,6 +49,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
@@ -57,6 +59,7 @@
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
+import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -320,11 +323,11 @@
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(), functionStats.getAvgProcessLatency());
// validate prometheus metrics empty
- String prometheusMetrics = getPrometheusMetrics(pulsar.getListenPortHTTP().get());
+ String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get());
log.info("prometheus metrics: {}", prometheusMetrics);
- Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
- Metric m = metrics.get("pulsar_function_received_total");
+ Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
+ PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_function_received_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), functionName);
@@ -478,10 +481,10 @@
assertEquals(functionInstanceStats, functionStats.instances.get(0).getMetrics());
// validate prometheus metrics
- prometheusMetrics = getPrometheusMetrics(pulsar.getListenPortHTTP().get());
+ prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get());
log.info("prometheus metrics: {}", prometheusMetrics);
- metrics = parseMetrics(prometheusMetrics);
+ metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
m = metrics.get("pulsar_function_received_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
@@ -927,5 +930,4 @@
// make sure subscriptions are cleanup
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
}
-
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 1eb7077..0299464 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -49,6 +49,7 @@
import org.apache.pulsar.compaction.TwoPhaseCompactor;
import org.apache.pulsar.functions.LocalRunner;
import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils;
import org.testng.annotations.Test;
import com.google.common.collect.Lists;
@@ -107,9 +108,9 @@
// 5 Sink should only read compacted value,so we will only receive compacted messages
retryStrategically((test) -> {
try {
- String prometheusMetrics = getPrometheusMetrics(pulsar.getListenPortHTTP().get());
- Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
- Metric m = metrics.get("pulsar_sink_received_total");
+ String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get());
+ Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
+ PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_received_total");
return m.value == (double) maxKeys;
} catch (Exception e) {
return false;
@@ -249,11 +250,11 @@
assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.get(0).availablePermits, 523);
// validate prometheus metrics empty
- String prometheusMetrics = getPrometheusMetrics(pulsar.getListenPortHTTP().get());
+ String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get());
log.info("prometheus metrics: {}", prometheusMetrics);
- Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
- Metric m = metrics.get("pulsar_sink_received_total");
+ Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
+ PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_received_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), sinkName);
@@ -332,10 +333,10 @@
}, 5, 200);
// get stats after producing
- prometheusMetrics = getPrometheusMetrics(pulsar.getListenPortHTTP().get());
+ prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get());
log.info("prometheusMetrics: {}", prometheusMetrics);
- metrics = parseMetrics(prometheusMetrics);
+ metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
m = metrics.get("pulsar_sink_received_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSourceE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSourceE2ETest.java
index f01a4bc..af08363 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSourceE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSourceE2ETest.java
@@ -33,6 +33,7 @@
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils;
import org.testng.annotations.Test;
import com.google.common.collect.Lists;
@@ -105,11 +106,11 @@
}, 50, 150);
assertEquals(admin.topics().getStats(sinkTopic2).publishers.size(), 1);
- String prometheusMetrics = getPrometheusMetrics(pulsar.getListenPortHTTP().get());
+ String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get());
log.info("prometheusMetrics: {}", prometheusMetrics);
- Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
- Metric m = metrics.get("pulsar_source_received_total");
+ Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
+ PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_source_received_total");
assertEquals(m.tags.get("cluster"), config.getClusterName());
assertEquals(m.tags.get("instance_id"), "0");
assertEquals(m.tags.get("name"), sourceName);
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index fdfdd90..f599139 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -30,6 +30,7 @@
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
+import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.LinkedList;
@@ -45,6 +46,9 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.HTTPServer;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -59,6 +63,7 @@
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.functions.runtime.RuntimeUtils;
import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
@@ -166,6 +171,8 @@
protected String secretsProviderClassName;
@Parameter(names = "--secretsProviderConfig", description = "Whats the config for the secrets provider", hidden = true)
protected String secretsProviderConfig;
+ @Parameter(names = "--metricsPortStart", description = "The starting port range for metrics server", hidden = true)
+ protected Integer metricsPortStart;
private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650";
private static final String DEFAULT_WEB_SERVICE_URL = "http://localhost:8080";
@@ -186,7 +193,7 @@
boolean useTls, boolean tlsAllowInsecureConnection, boolean tlsHostNameVerificationEnabled,
String tlsTrustCertFilePath, int instanceIdOffset, RuntimeEnv runtimeEnv,
String secretsProviderClassName, String secretsProviderConfig, String narExtractionDirectory,
- String connectorsDirectory) {
+ String connectorsDirectory, Integer metricsPortStart) {
this.functionConfig = functionConfig;
this.sourceConfig = sourceConfig;
this.sinkConfig = sinkConfig;
@@ -218,6 +225,7 @@
}
this.connectorsDir = Paths.get(pulsarHome, "connectors").toString();
}
+ this.metricsPortStart = metricsPortStart;
shutdownHook = new Thread() {
public void run() {
LocalRunner.this.stop();
@@ -544,6 +552,11 @@
if (functionConfig != null && functionConfig.getExposePulsarAdminClientEnabled() != null) {
exposePulsarAdminClientEnabled = functionConfig.getExposePulsarAdminClientEnabled();
}
+
+ // Collector Registry for prometheus metrics
+ CollectorRegistry collectorRegistry = new CollectorRegistry();
+ RuntimeUtils.registerDefaultCollectors(collectorRegistry);
+
ThreadRuntimeFactory threadRuntimeFactory;
ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
try {
@@ -555,7 +568,7 @@
stateStorageServiceUrl,
authConfig,
secretsProvider,
- null, narExtractionDirectory,
+ collectorRegistry, narExtractionDirectory,
null,
exposePulsarAdminClientEnabled, webServiceUrl);
} finally {
@@ -569,8 +582,12 @@
instanceConfig.setFunctionId(UUID.randomUUID().toString());
instanceConfig.setInstanceId(i + instanceIdOffset);
instanceConfig.setMaxBufferedTuples(1024);
- instanceConfig.setPort(FunctionCommon.findAvailablePort());
- instanceConfig.setMetricsPort(FunctionCommon.findAvailablePort());
+ if (metricsPortStart != null) {
+ if (metricsPortStart < 0 || metricsPortStart > 65535) {
+ throw new IllegalArgumentException("Metrics port need to be within the range of 0 and 65535");
+ }
+ instanceConfig.setMetricsPort(metricsPortStart + i);
+ }
instanceConfig.setClusterName("local");
if (functionConfig != null) {
instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
@@ -578,6 +595,7 @@
instanceConfig.setExposePulsarAdminClientEnabled(functionConfig.getExposePulsarAdminClientEnabled());
}
}
+
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
instanceConfig,
userCodeFile,
@@ -586,6 +604,13 @@
30000);
spawners.add(runtimeSpawner);
runtimeSpawner.start();
+
+ if (metricsPortStart != null) {
+ // starting metrics server
+ log.info("Starting metrics server on port {}", instanceConfig.getMetricsPort());
+ new HTTPServer(new InetSocketAddress(instanceConfig.getMetricsPort()), collectorRegistry, true);
+ }
+
}
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index 2067110..0de9782 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -31,14 +31,6 @@
import io.grpc.stub.StreamObserver;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.exporter.HTTPServer;
-import io.prometheus.client.hotspot.BufferPoolsExports;
-import io.prometheus.client.hotspot.ClassLoadingExports;
-import io.prometheus.client.hotspot.GarbageCollectorExports;
-import io.prometheus.client.hotspot.MemoryPoolsExports;
-import io.prometheus.client.hotspot.StandardExports;
-import io.prometheus.client.hotspot.ThreadExports;
-import io.prometheus.client.hotspot.VersionInfoExports;
-import io.prometheus.jmx.JmxCollector;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.nar.NarClassLoader;
@@ -53,7 +45,6 @@
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.common.util.Reflections;
-import javax.management.MalformedObjectNameException;
import java.lang.reflect.Type;
import java.net.InetSocketAddress;
import java.util.Map;
@@ -201,7 +192,7 @@
// Collector Registry for prometheus metrics
CollectorRegistry collectorRegistry = new CollectorRegistry();
- registerDefaultCollectors(collectorRegistry);
+ RuntimeUtils.registerDefaultCollectors(collectorRegistry);
containerFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup", pulsarServiceUrl,
stateStorageServiceUrl,
@@ -261,23 +252,6 @@
close();
}
- private void registerDefaultCollectors(CollectorRegistry registry) {
- // Add the JMX exporter for functionality similar to the kafka connect JMX metrics
- try {
- new JmxCollector("{}").register(registry);
- } catch (MalformedObjectNameException ex) {
- System.err.println(ex);
- }
- // Add the default exports from io.prometheus.client.hotspot.DefaultExports
- new StandardExports().register(registry);
- new MemoryPoolsExports().register(registry);
- new BufferPoolsExports().register(registry);
- new GarbageCollectorExports().register(registry);
- new ThreadExports().register(registry);
- new ClassLoadingExports().register(registry);
- new VersionInfoExports().register(registry);
- }
-
private static boolean isTrue(String param) {
return Boolean.TRUE.toString().equals(param);
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 1ac1ece..c62a6bf 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -32,6 +32,15 @@
import java.util.List;
import java.util.Map;
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.hotspot.BufferPoolsExports;
+import io.prometheus.client.hotspot.ClassLoadingExports;
+import io.prometheus.client.hotspot.GarbageCollectorExports;
+import io.prometheus.client.hotspot.MemoryPoolsExports;
+import io.prometheus.client.hotspot.StandardExports;
+import io.prometheus.client.hotspot.ThreadExports;
+import io.prometheus.client.hotspot.VersionInfoExports;
+import io.prometheus.jmx.JmxCollector;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -41,6 +50,8 @@
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.utils.FunctionCommon;
+import javax.management.MalformedObjectNameException;
+
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -464,4 +475,20 @@
return ObjectMapperFactory.getThreadLocal().convertValue(configMap, functionRuntimeConfigClass);
}
+ public static void registerDefaultCollectors(CollectorRegistry registry) {
+ // Add the JMX exporter for functionality similar to the kafka connect JMX metrics
+ try {
+ new JmxCollector("{}").register(registry);
+ } catch (MalformedObjectNameException ex) {
+ System.err.println(ex);
+ }
+ // Add the default exports from io.prometheus.client.hotspot.DefaultExports
+ new StandardExports().register(registry);
+ new MemoryPoolsExports().register(registry);
+ new BufferPoolsExports().register(registry);
+ new GarbageCollectorExports().register(registry);
+ new ThreadExports().register(registry);
+ new ClassLoadingExports().register(registry);
+ new VersionInfoExports().register(registry);
+ }
}