Exposing prometheus metrics for Pulsar function local run mode (#10156)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index fca0298..4975280 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -23,6 +23,7 @@
 import static org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
@@ -81,6 +82,7 @@
 import org.apache.pulsar.functions.LocalRunner;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
+import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -411,6 +413,7 @@
         functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
 
         functionConfig.setJar(jarFilePathUrl);
+        int metricsPort = FunctionCommon.findAvailablePort();
         @Cleanup
         LocalRunner localRunner = LocalRunner.builder()
                 .functionConfig(functionConfig)
@@ -420,6 +423,7 @@
                 .tlsTrustCertFilePath(TLS_TRUST_CERT_FILE_PATH)
                 .tlsAllowInsecureConnection(true)
                 .tlsHostNameVerificationEnabled(false)
+                .metricsPortStart(metricsPort)
                 .brokerServiceUrl(pulsar.getBrokerServiceUrlTls()).build();
         localRunner.start(false);
 
@@ -463,6 +467,21 @@
         assertNotEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
                 totalMsgs);
 
+        // validate prometheus metrics
+        String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(metricsPort);
+        log.info("prometheus metrics: {}", prometheusMetrics);
+
+        Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
+        assertFalse(metrics.isEmpty());
+
+        PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_function_processed_successfully_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), functionName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, functionName));
+        assertEquals(m.value, 5.0);
+
         // stop functions
         localRunner.stop();
 
@@ -771,6 +790,7 @@
         }
 
         sinkConfig.setArchive(jarFilePathUrl);
+        int metricsPort = FunctionCommon.findAvailablePort();
         @Cleanup
         LocalRunner localRunner = LocalRunner.builder()
                 .sinkConfig(sinkConfig)
@@ -782,6 +802,7 @@
                 .tlsHostNameVerificationEnabled(false)
                 .brokerServiceUrl(pulsar.getBrokerServiceUrlTls())
                 .connectorsDirectory(workerConfig.getConnectorsDirectory())
+                .metricsPortStart(metricsPort)
                 .build();
 
         localRunner.start(false);
@@ -819,6 +840,21 @@
             }
         }, 5, 200);
 
+        // validate prometheus metrics
+        String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(metricsPort);
+        log.info("prometheus metrics: {}", prometheusMetrics);
+
+        Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
+        assertFalse(metrics.isEmpty());
+
+        PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_written_total");
+        assertEquals(m.tags.get("cluster"), config.getClusterName());
+        assertEquals(m.tags.get("instance_id"), "0");
+        assertEquals(m.tags.get("name"), sinkName);
+        assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion));
+        assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sinkName));
+        assertEquals(m.value, 10.0);
+
         // stop sink
         localRunner.stop();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java
new file mode 100644
index 0000000..769cfe9
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestUtils.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+@Slf4j
+public class PulsarFunctionTestUtils {
+    public static String getPrometheusMetrics(int metricsPort) throws IOException {
+        StringBuilder result = new StringBuilder();
+        URL url = new URL(String.format("http://%s:%s/metrics", "localhost", metricsPort));
+        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+        conn.setRequestMethod("GET");
+        BufferedReader rd = new BufferedReader(new InputStreamReader(conn.getInputStream()));
+        String line;
+        while ((line = rd.readLine()) != null) {
+            result.append(line + System.lineSeparator());
+        }
+        rd.close();
+        return result.toString();
+    }
+
+    /**
+     * Hacky parsing of Prometheus text format. Sould be good enough for unit tests
+     */
+    public static Map<String, Metric> parseMetrics(String metrics) {
+        final Map<String, Metric> parsed = new HashMap<>();
+        // Example of lines are
+        // jvm_threads_current{cluster="standalone",} 203.0
+        // or
+        // pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1",
+        // topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
+        Pattern pattern = Pattern.compile("^(\\w+)(\\{[^\\}]+\\})?\\s(-?[\\d\\w\\.-]+)(\\s(\\d+))?$");
+        Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
+        Arrays.asList(metrics.split("\n")).forEach(line -> {
+            if (line.isEmpty() || line.startsWith("#")) {
+                return;
+            }
+            Matcher matcher = pattern.matcher(line);
+            log.info("line: {}", line);
+            checkArgument(matcher.matches());
+            String name = matcher.group(1);
+            Metric m = new Metric();
+            String numericValue = matcher.group(3);
+            if (numericValue.equalsIgnoreCase("-Inf")) {
+                m.value = Double.NEGATIVE_INFINITY;
+            } else if (numericValue.equalsIgnoreCase("+Inf")) {
+                m.value = Double.POSITIVE_INFINITY;
+            } else {
+                m.value = Double.parseDouble(numericValue);
+            }
+            String tags = matcher.group(2);
+            if (tags != null) {
+                tags = tags.replace("{", "").replace("}", "");
+                Matcher tagsMatcher = tagsPattern.matcher(tags);
+                while (tagsMatcher.find()) {
+                    String tag = tagsMatcher.group(1);
+                    String value = tagsMatcher.group(2);
+                    m.tags.put(tag, value);
+                }
+            }
+            parsed.put(name, m);
+        });
+
+        log.info("parsed metrics: {}", parsed);
+        return parsed;
+    }
+
+    @ToString
+    public static class Metric {
+        public final Map<String, String> tags = new TreeMap<>();
+        public double value;
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
index ef9f65c..53ca40a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/AbstractPulsarE2ETest.java
@@ -297,64 +297,4 @@
         workerService.init(workerConfig, null, false);
         return workerService;
     }
-    
-    protected static String getPrometheusMetrics(int metricsPort) throws IOException {
-        StringBuilder result = new StringBuilder();
-        URL url = new URL(String.format("http://%s:%s/metrics", "localhost", metricsPort));
-        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
-        conn.setRequestMethod("GET");
-        BufferedReader rd = new BufferedReader(new InputStreamReader(conn.getInputStream()));
-        String line;
-        while ((line = rd.readLine()) != null) {
-            result.append(line + System.lineSeparator());
-        }
-        rd.close();
-        return result.toString();
-    }
-
-    /**
-     * Hacky parsing of Prometheus text format. Sould be good enough for unit tests
-     */
-    protected static Map<String, Metric> parseMetrics(String metrics) {
-        final Map<String, Metric> parsed = new HashMap<>();
-        // Example of lines are
-        // jvm_threads_current{cluster="standalone",} 203.0
-        // or
-        // pulsar_subscriptions_count{cluster="standalone", namespace="sample/standalone/ns1",
-        // topic="persistent://sample/standalone/ns1/test-2"} 0.0 1517945780897
-        Pattern pattern = Pattern.compile("^(\\w+)\\{([^\\}]+)\\}\\s(-?[\\d\\w\\.-]+)(\\s(\\d+))?$");
-        Pattern tagsPattern = Pattern.compile("(\\w+)=\"([^\"]+)\"(,\\s?)?");
-        Arrays.asList(metrics.split("\n")).forEach(line -> {
-            if (line.isEmpty() || line.startsWith("#")) {
-                return;
-            }
-            Matcher matcher = pattern.matcher(line);
-            checkArgument(matcher.matches());
-            String name = matcher.group(1);
-            Metric m = new Metric();
-            String numericValue = matcher.group(3);
-            if (numericValue.equalsIgnoreCase("-Inf")) {
-                m.value = Double.NEGATIVE_INFINITY;
-            } else if (numericValue.equalsIgnoreCase("+Inf")) {
-                m.value = Double.POSITIVE_INFINITY;
-            } else {
-                m.value = Double.parseDouble(numericValue);
-            }
-            String tags = matcher.group(2);
-            Matcher tagsMatcher = tagsPattern.matcher(tags);
-            while (tagsMatcher.find()) {
-                String tag = tagsMatcher.group(1);
-                String value = tagsMatcher.group(2);
-                m.tags.put(tag, value);
-            }
-            parsed.put(name, m);
-        });
-        return parsed;
-    }
-    
-    @ToString
-    static class Metric {
-        final Map<String, String> tags = new TreeMap<>();
-        double value;
-    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarBatchSourceE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarBatchSourceE2ETest.java
index b2cd9ee..026d988 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarBatchSourceE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarBatchSourceE2ETest.java
@@ -35,6 +35,7 @@
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 
+import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils;
 import org.apache.pulsar.io.batchdiscovery.ImmediateTriggerer;
 import org.testng.annotations.Test;
 
@@ -101,11 +102,11 @@
         }, 50, 150);
         assertEquals(admin.topics().getStats(sinkTopic2).publishers.size(), 1);
 
-        String prometheusMetrics = getPrometheusMetrics(pulsar.getListenPortHTTP().get());
+        String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get());
         log.info("prometheusMetrics: {}", prometheusMetrics);
 
-        Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
-        Metric m = metrics.get("pulsar_source_received_total");
+        Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
+        PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_source_received_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), sourceName);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 4a24415..e56d4f4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -32,6 +32,7 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -48,6 +49,7 @@
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
@@ -57,6 +59,7 @@
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
+import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -320,11 +323,11 @@
         assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(), functionStats.getAvgProcessLatency());
 
         // validate prometheus metrics empty
-        String prometheusMetrics = getPrometheusMetrics(pulsar.getListenPortHTTP().get());
+        String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get());
         log.info("prometheus metrics: {}", prometheusMetrics);
 
-        Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
-        Metric m = metrics.get("pulsar_function_received_total");
+        Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
+        PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_function_received_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), functionName);
@@ -478,10 +481,10 @@
         assertEquals(functionInstanceStats, functionStats.instances.get(0).getMetrics());
 
         // validate prometheus metrics
-        prometheusMetrics = getPrometheusMetrics(pulsar.getListenPortHTTP().get());
+        prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get());
         log.info("prometheus metrics: {}", prometheusMetrics);
 
-        metrics = parseMetrics(prometheusMetrics);
+        metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
         m = metrics.get("pulsar_function_received_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
@@ -927,5 +930,4 @@
         // make sure subscriptions are cleanup
         assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
     }
-
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
index 1eb7077..0299464 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java
@@ -49,6 +49,7 @@
 import org.apache.pulsar.compaction.TwoPhaseCompactor;
 import org.apache.pulsar.functions.LocalRunner;
 import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.Lists;
@@ -107,9 +108,9 @@
         // 5 Sink should only read compacted value,so we will only receive compacted messages
         retryStrategically((test) -> {
             try {
-                String prometheusMetrics = getPrometheusMetrics(pulsar.getListenPortHTTP().get());
-                Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
-                Metric m = metrics.get("pulsar_sink_received_total");
+                String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get());
+                Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
+                PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_received_total");
                 return m.value == (double) maxKeys;
             } catch (Exception e) {
                 return false;
@@ -249,11 +250,11 @@
         assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.get(0).availablePermits, 523);
 
         // validate prometheus metrics empty
-        String prometheusMetrics = getPrometheusMetrics(pulsar.getListenPortHTTP().get());
+        String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get());
         log.info("prometheus metrics: {}", prometheusMetrics);
 
-        Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
-        Metric m = metrics.get("pulsar_sink_received_total");
+        Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
+        PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_sink_received_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), sinkName);
@@ -332,10 +333,10 @@
         }, 5, 200);
 
         // get stats after producing
-        prometheusMetrics = getPrometheusMetrics(pulsar.getListenPortHTTP().get());
+        prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get());
         log.info("prometheusMetrics: {}", prometheusMetrics);
 
-        metrics = parseMetrics(prometheusMetrics);
+        metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
         m = metrics.get("pulsar_sink_received_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSourceE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSourceE2ETest.java
index f01a4bc..af08363 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSourceE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSourceE2ETest.java
@@ -33,6 +33,7 @@
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.worker.PulsarFunctionTestUtils;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.Lists;
@@ -105,11 +106,11 @@
         }, 50, 150);
         assertEquals(admin.topics().getStats(sinkTopic2).publishers.size(), 1);
 
-        String prometheusMetrics = getPrometheusMetrics(pulsar.getListenPortHTTP().get());
+        String prometheusMetrics = PulsarFunctionTestUtils.getPrometheusMetrics(pulsar.getListenPortHTTP().get());
         log.info("prometheusMetrics: {}", prometheusMetrics);
 
-        Map<String, Metric> metrics = parseMetrics(prometheusMetrics);
-        Metric m = metrics.get("pulsar_source_received_total");
+        Map<String, PulsarFunctionTestUtils.Metric> metrics = PulsarFunctionTestUtils.parseMetrics(prometheusMetrics);
+        PulsarFunctionTestUtils.Metric m = metrics.get("pulsar_source_received_total");
         assertEquals(m.tags.get("cluster"), config.getClusterName());
         assertEquals(m.tags.get("instance_id"), "0");
         assertEquals(m.tags.get("name"), sourceName);
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index fdfdd90..f599139 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -30,6 +30,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.net.InetSocketAddress;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.LinkedList;
@@ -45,6 +46,9 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.exporter.HTTPServer;
 import lombok.Builder;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -59,6 +63,7 @@
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.functions.runtime.RuntimeUtils;
 import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
@@ -166,6 +171,8 @@
     protected String secretsProviderClassName;
     @Parameter(names = "--secretsProviderConfig", description = "Whats the config for the secrets provider", hidden = true)
     protected String secretsProviderConfig;
+    @Parameter(names = "--metricsPortStart", description = "The starting port range for metrics server", hidden = true)
+    protected Integer metricsPortStart;
 
     private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650";
     private static final String DEFAULT_WEB_SERVICE_URL = "http://localhost:8080";
@@ -186,7 +193,7 @@
                        boolean useTls, boolean tlsAllowInsecureConnection, boolean tlsHostNameVerificationEnabled,
                        String tlsTrustCertFilePath, int instanceIdOffset, RuntimeEnv runtimeEnv,
                        String secretsProviderClassName, String secretsProviderConfig, String narExtractionDirectory,
-                       String connectorsDirectory) {
+                       String connectorsDirectory, Integer metricsPortStart) {
         this.functionConfig = functionConfig;
         this.sourceConfig = sourceConfig;
         this.sinkConfig = sinkConfig;
@@ -218,6 +225,7 @@
             }
             this.connectorsDir = Paths.get(pulsarHome, "connectors").toString();
         }
+        this.metricsPortStart = metricsPortStart;
         shutdownHook = new Thread() {
             public void run() {
                 LocalRunner.this.stop();
@@ -544,6 +552,11 @@
         if (functionConfig != null && functionConfig.getExposePulsarAdminClientEnabled() != null) {
             exposePulsarAdminClientEnabled = functionConfig.getExposePulsarAdminClientEnabled();
         }
+
+        // Collector Registry for prometheus metrics
+        CollectorRegistry collectorRegistry = new CollectorRegistry();
+        RuntimeUtils.registerDefaultCollectors(collectorRegistry);
+
         ThreadRuntimeFactory threadRuntimeFactory;
         ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
         try {
@@ -555,7 +568,7 @@
                     stateStorageServiceUrl,
                     authConfig,
                     secretsProvider,
-                    null, narExtractionDirectory,
+                    collectorRegistry, narExtractionDirectory,
                     null,
                     exposePulsarAdminClientEnabled, webServiceUrl);
         } finally {
@@ -569,8 +582,12 @@
             instanceConfig.setFunctionId(UUID.randomUUID().toString());
             instanceConfig.setInstanceId(i + instanceIdOffset);
             instanceConfig.setMaxBufferedTuples(1024);
-            instanceConfig.setPort(FunctionCommon.findAvailablePort());
-            instanceConfig.setMetricsPort(FunctionCommon.findAvailablePort());
+            if (metricsPortStart != null) {
+                if (metricsPortStart < 0 || metricsPortStart > 65535) {
+                    throw new IllegalArgumentException("Metrics port need to be within the range of 0 and 65535");
+                }
+                instanceConfig.setMetricsPort(metricsPortStart + i);
+            }
             instanceConfig.setClusterName("local");
             if (functionConfig != null) {
                 instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
@@ -578,6 +595,7 @@
                     instanceConfig.setExposePulsarAdminClientEnabled(functionConfig.getExposePulsarAdminClientEnabled());
                 }
             }
+
             RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
                     instanceConfig,
                     userCodeFile,
@@ -586,6 +604,13 @@
                     30000);
             spawners.add(runtimeSpawner);
             runtimeSpawner.start();
+
+            if (metricsPortStart != null) {
+                // starting metrics server
+                log.info("Starting metrics server on port {}", instanceConfig.getMetricsPort());
+                new HTTPServer(new InetSocketAddress(instanceConfig.getMetricsPort()), collectorRegistry, true);
+            }
+
         }
     }
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index 2067110..0de9782 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -31,14 +31,6 @@
 import io.grpc.stub.StreamObserver;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.exporter.HTTPServer;
-import io.prometheus.client.hotspot.BufferPoolsExports;
-import io.prometheus.client.hotspot.ClassLoadingExports;
-import io.prometheus.client.hotspot.GarbageCollectorExports;
-import io.prometheus.client.hotspot.MemoryPoolsExports;
-import io.prometheus.client.hotspot.StandardExports;
-import io.prometheus.client.hotspot.ThreadExports;
-import io.prometheus.client.hotspot.VersionInfoExports;
-import io.prometheus.jmx.JmxCollector;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.nar.NarClassLoader;
@@ -53,7 +45,6 @@
 import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.common.util.Reflections;
 
-import javax.management.MalformedObjectNameException;
 import java.lang.reflect.Type;
 import java.net.InetSocketAddress;
 import java.util.Map;
@@ -201,7 +192,7 @@
 
         // Collector Registry for prometheus metrics
         CollectorRegistry collectorRegistry = new CollectorRegistry();
-        registerDefaultCollectors(collectorRegistry);
+        RuntimeUtils.registerDefaultCollectors(collectorRegistry);
 
         containerFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup", pulsarServiceUrl,
                 stateStorageServiceUrl,
@@ -261,23 +252,6 @@
         close();
     }
 
-    private void registerDefaultCollectors(CollectorRegistry registry) {
-        // Add the JMX exporter for functionality similar to the kafka connect JMX metrics
-        try {
-            new JmxCollector("{}").register(registry);
-        } catch (MalformedObjectNameException ex) {
-            System.err.println(ex);
-        }
-        // Add the default exports from io.prometheus.client.hotspot.DefaultExports
-        new StandardExports().register(registry);
-        new MemoryPoolsExports().register(registry);
-        new BufferPoolsExports().register(registry);
-        new GarbageCollectorExports().register(registry);
-        new ThreadExports().register(registry);
-        new ClassLoadingExports().register(registry);
-        new VersionInfoExports().register(registry);
-    }
-
     private static boolean isTrue(String param) {
         return Boolean.TRUE.toString().equals(param);
     }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 1ac1ece..c62a6bf 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -32,6 +32,15 @@
 import java.util.List;
 import java.util.Map;
 
+import io.prometheus.client.CollectorRegistry;
+import io.prometheus.client.hotspot.BufferPoolsExports;
+import io.prometheus.client.hotspot.ClassLoadingExports;
+import io.prometheus.client.hotspot.GarbageCollectorExports;
+import io.prometheus.client.hotspot.MemoryPoolsExports;
+import io.prometheus.client.hotspot.StandardExports;
+import io.prometheus.client.hotspot.ThreadExports;
+import io.prometheus.client.hotspot.VersionInfoExports;
+import io.prometheus.jmx.JmxCollector;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -41,6 +50,8 @@
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 
+import javax.management.MalformedObjectNameException;
+
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 
@@ -464,4 +475,20 @@
         return ObjectMapperFactory.getThreadLocal().convertValue(configMap, functionRuntimeConfigClass);
     }
 
+    public static void registerDefaultCollectors(CollectorRegistry registry) {
+        // Add the JMX exporter for functionality similar to the kafka connect JMX metrics
+        try {
+            new JmxCollector("{}").register(registry);
+        } catch (MalformedObjectNameException ex) {
+            System.err.println(ex);
+        }
+        // Add the default exports from io.prometheus.client.hotspot.DefaultExports
+        new StandardExports().register(registry);
+        new MemoryPoolsExports().register(registry);
+        new BufferPoolsExports().register(registry);
+        new GarbageCollectorExports().register(registry);
+        new ThreadExports().register(registry);
+        new ClassLoadingExports().register(registry);
+        new VersionInfoExports().register(registry);
+    }
 }