[Testing] Improve Functions unit tests by using unique nar extraction directory (#10032)

- Use unique temporary nar extraction directory for each test
  - This fixes possible issues caused by running tests with testForkCount > 1 setting (current status in master branch)
  - Delete directory consistently in AfterMethod

- Add feature LocalRunner to create a unique temp directory for extracting
  nar files. Delete directory on close

- Fix some ClassLoader resource cleanup issues that came up while working on the changes
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/FileServer.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/FileServer.java
index 6d1fbdb..ce3d770 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/FileServer.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/FileServer.java
@@ -18,30 +18,40 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import com.google.api.Http;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
 import com.sun.net.httpserver.Headers;
 import com.sun.net.httpserver.HttpServer;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.net.HttpURLConnection;
 import java.net.InetSocketAddress;
+import java.net.URL;
 import java.nio.file.Files;
 import lombok.extern.slf4j.Slf4j;
+import org.awaitility.Awaitility;
 
 /**
  * Simple http server for serving files in Pulsar Function test cases
  */
 @Slf4j
 public class FileServer implements AutoCloseable {
+    private static final String HEALTH_PATH = "/health";
     private final HttpServer httpServer;
 
     public FileServer() throws IOException {
         httpServer = HttpServer.create(new InetSocketAddress(0), 0);
         // creates a default executor
         httpServer.setExecutor(null);
+        httpServer.createContext(HEALTH_PATH, he -> {
+           he.sendResponseHeaders(204, 0);
+        });
     }
 
     public void serveFile(String path, File file) {
+        assertTrue(file.exists(), file.getAbsolutePath() + " doesn't exist.");
         httpServer.createContext(path, he -> {
             try {
                 Headers headers = he.getResponseHeaders();
@@ -59,6 +69,29 @@
 
     public void start() {
         httpServer.start();
+        waitUntilServerIsAvailable();
+    }
+
+    private void waitUntilServerIsAvailable() {
+        // wait until server is available.
+        // There has been a few flakiness issues where the server hasn't been available when
+        // the system-under-test has started to download files
+        // this assertion will call the "/health" endpoint and check that 204 status code is returned.
+        Awaitility.await()
+                .ignoreExceptions()
+                .untilAsserted(() -> {
+                    HttpURLConnection urlConnection = (HttpURLConnection) new URL(getUrl(HEALTH_PATH))
+                            .openConnection();
+                    urlConnection.setUseCaches(false);
+                    urlConnection.setConnectTimeout(5000);
+                    urlConnection.setReadTimeout(5000);
+                    try {
+                        urlConnection.connect();
+                        assertEquals(urlConnection.getResponseCode(), 204);
+                    } finally {
+                        urlConnection.disconnect();
+                    }
+                });
     }
 
     public void stop() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
index 9d7a796..fb3d1ca 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
@@ -30,9 +30,7 @@
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-
 import io.jsonwebtoken.SignatureAlgorithm;
-
 import java.lang.reflect.Method;
 import java.net.URL;
 import java.util.Collections;
@@ -42,9 +40,7 @@
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-
 import javax.crypto.SecretKey;
-
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -63,7 +59,6 @@
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
@@ -112,6 +107,7 @@
     private static final Logger log = LoggerFactory.getLogger(PulsarFunctionE2ETest.class);
     private String adminToken;
     private String brokerServiceUrl;
+    private PulsarFunctionTestTemporaryDirectory tempDirectory;
 
     @DataProvider(name = "validRoleName")
     public Object[][] validRoleName() {
@@ -210,12 +206,18 @@
 
     @AfterMethod(alwaysRun = true)
     void shutdown() throws Exception {
-        log.info("--- Shutting down ---");
-        pulsarClient.close();
-        superUserAdmin.close();
-        functionsWorkerService.stop();
-        pulsar.close();
-        bkEnsemble.stop();
+        try {
+            log.info("--- Shutting down ---");
+            pulsarClient.close();
+            superUserAdmin.close();
+            functionsWorkerService.stop();
+            pulsar.close();
+            bkEnsemble.stop();
+        } finally {
+            if (tempDirectory != null) {
+                tempDirectory.delete();
+            }
+        }
     }
 
     private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
@@ -224,6 +226,8 @@
                 FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath());
 
         workerConfig = new WorkerConfig();
+        tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
+        tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
         workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
         workerConfig.setSchedulerClassName(
                 org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index f24a520..fca0298 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -22,15 +22,14 @@
 import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
 import static org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
 import static org.mockito.Mockito.spy;
-
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Method;
@@ -48,7 +47,6 @@
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
-import org.apache.commons.io.FileUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -122,6 +120,7 @@
     private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
 
     private static final String SYSTEM_PROPERTY_NAME_NAR_FILE_PATH = "pulsar-io-data-generator.nar.path";
+    private PulsarFunctionTestTemporaryDirectory tempDirectory;
 
     public static File getPulsarIODataGeneratorNar() {
         return new File(Objects.requireNonNull(System.getProperty(SYSTEM_PROPERTY_NAME_NAR_FILE_PATH)
@@ -177,15 +176,6 @@
 
     @BeforeMethod
     void setup(Method method) throws Exception {
-
-        // delete all function temp files
-        File dir = new File(System.getProperty("java.io.tmpdir"));
-        File[] foundFiles = dir.listFiles((ignoredDir, name) -> name.startsWith("function"));
-
-        for (File file : foundFiles) {
-            file.delete();
-        }
-
         log.info("--- Setting up method {} ---", method.getName());
 
         // Start local bookkeeper ensemble
@@ -230,16 +220,8 @@
         if (Arrays.asList(method.getAnnotation(Test.class).groups()).contains("builtin")) {
             File connectorsDir = new File(workerConfig.getConnectorsDirectory());
 
-            if (connectorsDir.exists()) {
-                FileUtils.deleteDirectory(connectorsDir);
-            }
-
-            if (connectorsDir.mkdir()) {
-                File file = getPulsarIODataGeneratorNar();
-                Files.copy(file.toPath(), new File(connectorsDir.getAbsolutePath() + "/" + file.getName()).toPath());
-            } else {
-                throw new RuntimeException("Failed to create builtin connectors directory");
-            }
+            File file = getPulsarIODataGeneratorNar();
+            Files.copy(file.toPath(), new File(connectorsDir, file.getName()).toPath());
         }
 
         Optional<WorkerService> functionWorkerService = Optional.empty();
@@ -292,16 +274,17 @@
 
     @AfterMethod(alwaysRun = true)
     void shutdown() throws Exception {
-        log.info("--- Shutting down ---");
-        fileServer.stop();
-        pulsarClient.close();
-        admin.close();
-        pulsar.close();
-        bkEnsemble.stop();
-
-        File connectorsDir = new File(workerConfig.getConnectorsDirectory());
-        if (connectorsDir.exists()) {
-            FileUtils.deleteDirectory(connectorsDir);
+        try {
+            log.info("--- Shutting down ---");
+            fileServer.stop();
+            pulsarClient.close();
+            admin.close();
+            pulsar.close();
+            bkEnsemble.stop();
+        } finally {
+            if (tempDirectory != null) {
+                tempDirectory.delete();
+            }
         }
     }
 
@@ -311,6 +294,8 @@
                 FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath());
 
         WorkerConfig workerConfig = new WorkerConfig();
+        tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
+        tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
         workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
         workerConfig.setSchedulerClassName(
                 org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
@@ -681,7 +666,9 @@
                 .tlsTrustCertFilePath(TLS_TRUST_CERT_FILE_PATH)
                 .tlsAllowInsecureConnection(true)
                 .tlsHostNameVerificationEnabled(false)
-                .brokerServiceUrl(pulsar.getBrokerServiceUrlTls()).build();
+                .brokerServiceUrl(pulsar.getBrokerServiceUrlTls())
+                .connectorsDirectory(workerConfig.getConnectorsDirectory())
+                .build();
 
         localRunner.start(false);
 
@@ -793,7 +780,9 @@
                 .tlsTrustCertFilePath(TLS_TRUST_CERT_FILE_PATH)
                 .tlsAllowInsecureConnection(true)
                 .tlsHostNameVerificationEnabled(false)
-                .brokerServiceUrl(pulsar.getBrokerServiceUrlTls()).build();
+                .brokerServiceUrl(pulsar.getBrokerServiceUrlTls())
+                .connectorsDirectory(workerConfig.getConnectorsDirectory())
+                .build();
 
         localRunner.start(false);
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index 0bf0886..d7597d8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -28,6 +28,19 @@
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import java.io.File;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.api.namespace.Namespace;
@@ -50,7 +63,6 @@
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
@@ -66,21 +78,6 @@
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import java.io.File;
-import java.lang.reflect.Method;
-import java.net.URI;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
 /**
  * Test Pulsar function state
  */
@@ -107,6 +104,7 @@
     private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
     private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
     private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
+    private PulsarFunctionTestTemporaryDirectory tempDirectory;
 
     @DataProvider(name = "validRoleName")
     public Object[][] validRoleName() {
@@ -115,15 +113,6 @@
 
     @BeforeMethod
     void setup(Method method) throws Exception {
-
-        // delete all function temp files
-        File dir = new File(System.getProperty("java.io.tmpdir"));
-        File[] foundFiles = dir.listFiles((ignoredDir, name) -> name.startsWith("function"));
-
-        for (File file : foundFiles) {
-            file.delete();
-        }
-
         log.info("--- Setting up method {} ---", method.getName());
 
         // Start local bookkeeper ensemble
@@ -214,17 +203,25 @@
 
     @AfterMethod(alwaysRun = true)
     void shutdown() throws Exception {
-        log.info("--- Shutting down ---");
-        pulsarClient.close();
-        admin.close();
-        functionsWorkerService.stop();
-        pulsar.close();
-        bkEnsemble.stop();
+        try {
+            log.info("--- Shutting down ---");
+            pulsarClient.close();
+            admin.close();
+            functionsWorkerService.stop();
+            pulsar.close();
+            bkEnsemble.stop();
+        } finally {
+            if (tempDirectory != null) {
+                tempDirectory.delete();
+            }
+        }
     }
 
     private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
 
         workerConfig = new WorkerConfig();
+        tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
+        tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
         workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
         workerConfig.setSchedulerClassName(
                 org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
@@ -366,11 +363,7 @@
         // make sure subscriptions are cleanup
         assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
 
-        // make sure all temp files are deleted
-        File dir = new File(System.getProperty("java.io.tmpdir"));
-        File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
-
-        Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
+        tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
     }
 
     @Test
@@ -507,11 +500,7 @@
         // make sure subscriptions are cleanup
         assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
 
-        // make sure all temp files are deleted
-        File dir = new File(System.getProperty("java.io.tmpdir"));
-        File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
-
-        Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
+        tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
 
         DistributedLogConfiguration dlogConf = WorkerUtils.getDlogConf(workerConfig);
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestTemporaryDirectory.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestTemporaryDirectory.java
new file mode 100644
index 0000000..b97d2a1
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestTemporaryDirectory.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+import org.apache.commons.io.FileUtils;
+import org.testng.Assert;
+
+/**
+ * Creates a temporary directory that contains 3 subdirectories,
+ * "narExtractionDirectory", "downloadDirectory" and "connectorsDirectory",
+ * which are assigned to the provided workerConfig's respective settings with
+ * the {@link #useTemporaryDirectoriesForWorkerConfig(WorkerConfig)} method
+ */
+public class PulsarFunctionTestTemporaryDirectory {
+    private final File tempDirectory;
+    private final File narExtractionDirectory;
+    private final File downloadDirectory;
+    private final File connectorsDirectory;
+
+    private PulsarFunctionTestTemporaryDirectory(String tempDirectoryNamePrefix) throws IOException {
+        tempDirectory = Files.createTempDirectory(tempDirectoryNamePrefix).toFile();
+        narExtractionDirectory = new File(tempDirectory, "narExtractionDirectory");
+        narExtractionDirectory.mkdir();
+        downloadDirectory = new File(tempDirectory, "downloadDirectory");
+        downloadDirectory.mkdir();
+        connectorsDirectory = new File(tempDirectory, "connectorsDirectory");
+        connectorsDirectory.mkdir();
+    }
+
+    public static PulsarFunctionTestTemporaryDirectory create(String tempDirectoryNamePrefix) {
+        try {
+            return new PulsarFunctionTestTemporaryDirectory(tempDirectoryNamePrefix);
+        } catch (IOException e) {
+            throw new UncheckedIOException("Cannot create temporary directory", e);
+        }
+    }
+
+    public void useTemporaryDirectoriesForWorkerConfig(WorkerConfig workerConfig) {
+        workerConfig.setNarExtractionDirectory(narExtractionDirectory.getAbsolutePath());
+        workerConfig.setDownloadDirectory(downloadDirectory.getAbsolutePath());
+        workerConfig.setConnectorsDirectory(connectorsDirectory.getAbsolutePath());
+    }
+
+    public void delete() {
+        try {
+            FileUtils.deleteDirectory(tempDirectory);
+        } catch (IOException e) {
+            throw new UncheckedIOException("Cannot delete temporary directory", e);
+        }
+    }
+
+    public void assertThatFunctionDownloadTempFilesHaveBeenDeleted() {
+        // make sure all temp files are deleted
+        File[] foundFiles = downloadDirectory.listFiles((dir1, name) -> name.startsWith("function"));
+        Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: "
+                + Arrays.asList(foundFiles));
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
index 54c24a7..61e7e61 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -18,11 +18,15 @@
  */
 package org.apache.pulsar.functions.worker;
 
+import static org.testng.Assert.assertEquals;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Sets;
+import java.io.Closeable;
 import java.io.File;
-import java.net.MalformedURLException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -50,8 +54,6 @@
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.assertEquals;
-
 @Slf4j
 @Test(groups = "functions-worker")
 public class PulsarFunctionTlsTest {
@@ -72,6 +74,7 @@
     protected String testCluster = "my-cluster";
     protected String testTenant = "my-tenant";
     protected String testNamespace = testTenant + "/my-ns";
+    private PulsarFunctionTestTemporaryDirectory[] tempDirectories = new PulsarFunctionTestTemporaryDirectory[BROKER_COUNT];
 
     @BeforeMethod
     void setup() throws Exception {
@@ -114,6 +117,8 @@
             config.setTlsEnabled(true);
 
             WorkerConfig workerConfig = PulsarService.initializeWorkerConfigFromBrokerConfig(config, null);
+            tempDirectories[i] = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
+            tempDirectories[i].useTemporaryDirectoriesForWorkerConfig(workerConfig);
             workerConfig.setPulsarFunctionsNamespace("public/functions");
             workerConfig.setPulsarFunctionsCluster("my-cluster");
             workerConfig.setSchedulerClassName(
@@ -169,15 +174,23 @@
 
     @AfterMethod(alwaysRun = true)
     void tearDown() throws Exception {
-        for (int i = 0; i < BROKER_COUNT; i++) {
-            if (pulsarServices[i] != null) {
-                pulsarServices[i].close();
+        try {
+            for (int i = 0; i < BROKER_COUNT; i++) {
+                if (pulsarServices[i] != null) {
+                    pulsarServices[i].close();
+                }
+                if (pulsarAdmins[i] != null) {
+                    pulsarAdmins[i].close();
+                }
             }
-            if (pulsarAdmins[i] != null) {
-                pulsarAdmins[i].close();
+            bkEnsemble.stop();
+        } finally {
+            for (int i = 0; i < BROKER_COUNT; i++) {
+                if (tempDirectories[i] != null) {
+                    tempDirectories[i].delete();
+                }
             }
         }
-        bkEnsemble.stop();
     }
 
     @Test
@@ -217,9 +230,12 @@
     ) throws JsonProcessingException {
         File file = new File(jarFile);
         try {
-            ClassLoaderUtils.loadJar(file);
-        } catch (MalformedURLException e) {
-            throw new RuntimeException("Failed to load user jar " + file, e);
+            ClassLoader classLoader = ClassLoaderUtils.loadJar(file);
+            if (classLoader instanceof Closeable) {
+                ((Closeable) classLoader).close();
+            }
+        } catch (IOException e) {
+            throw new UncheckedIOException("Failed to load user jar " + file, e);
         }
         String sourceTopicPattern = String.format("persistent://%s/%s/%s", tenant, namespace, sourceTopic);
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index c824b79..7076b38 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -26,15 +26,12 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.gson.Gson;
-
 import java.lang.reflect.Method;
 import java.net.URI;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -47,7 +44,6 @@
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -79,6 +75,7 @@
     final String pulsarFunctionsNamespace = tenant + "/pulsar-function-admin";
     String primaryHost;
     String workerId;
+    private PulsarFunctionTestTemporaryDirectory tempDirectory;
 
     @BeforeMethod(timeOut = 60000)
     void setup(Method method) throws Exception {
@@ -135,11 +132,17 @@
             bkEnsemble.stop();
         } catch (Exception e) {
             log.warn("Encountered errors at shutting down PulsarWorkerAssignmentTest", e);
+        } finally {
+            if (tempDirectory != null) {
+                tempDirectory.delete();
+            }
         }
     }
 
     private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
         workerConfig = new WorkerConfig();
+        tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
+        tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
         workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
         workerConfig.setSchedulerClassName(
                 org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index ece1b2f..f8d7a7e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -33,6 +33,7 @@
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -49,7 +50,6 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
@@ -60,7 +60,6 @@
 import java.util.regex.Pattern;
 import lombok.Cleanup;
 import lombok.ToString;
-import org.apache.commons.io.FileUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -101,6 +100,7 @@
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.FileServer;
 import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
+import org.apache.pulsar.functions.worker.PulsarFunctionTestTemporaryDirectory;
 import org.apache.pulsar.functions.worker.PulsarWorkerService;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
@@ -133,6 +133,7 @@
     String pulsarFunctionsNamespace = tenant + "/pulsar-function-admin";
     String primaryHost;
     String workerId;
+    PulsarFunctionTestTemporaryDirectory tempDirectory;
 
     private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
     private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
@@ -150,15 +151,6 @@
 
     @BeforeMethod
     void setup(Method method) throws Exception {
-
-        // delete all function temp files
-        File dir = new File(System.getProperty("java.io.tmpdir"));
-        File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
-
-        for (File file : foundFiles) {
-            file.delete();
-        }
-
         log.info("--- Setting up method {} ---", method.getName());
 
         // Start local bookkeeper ensemble
@@ -206,19 +198,11 @@
         if (Arrays.asList(method.getAnnotation(Test.class).groups()).contains("builtin")) {
             File connectorsDir = new File(workerConfig.getConnectorsDirectory());
 
-            if (connectorsDir.exists()) {
-                FileUtils.deleteDirectory(connectorsDir);
-            }
+            File file = getPulsarIODataGeneratorNar();
+            Files.copy(file.toPath(), new File(connectorsDir, file.getName()).toPath());
 
-            if (connectorsDir.mkdir()) {
-                File file = getPulsarIODataGeneratorNar();
-                Files.copy(file.toPath(), new File(connectorsDir.getAbsolutePath() + "/" + file.getName()).toPath());
-
-                file = getPulsarIOBatchDataGeneratorNar();
-                Files.copy(file.toPath(), new File(connectorsDir.getAbsolutePath() + "/" + file.getName()).toPath());
-            } else {
-                throw new RuntimeException("Failed to create builtin connectors directory");
-            }
+            file = getPulsarIOBatchDataGeneratorNar();
+            Files.copy(file.toPath(), new File(connectorsDir, file.getName()).toPath());
         }
 
         Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
@@ -278,16 +262,17 @@
     @AfterMethod(alwaysRun = true)
     void shutdown() throws Exception {
         log.info("--- Shutting down ---");
-        fileServer.stop();
-        pulsarClient.close();
-        admin.close();
-        functionsWorkerService.stop();
-        pulsar.close();
-        bkEnsemble.stop();
-
-        File connectorsDir = new File(workerConfig.getConnectorsDirectory());
-        if (connectorsDir.exists()) {
-            FileUtils.deleteDirectory(connectorsDir);
+        try {
+            fileServer.stop();
+            pulsarClient.close();
+            admin.close();
+            functionsWorkerService.stop();
+            pulsar.close();
+            bkEnsemble.stop();
+        } finally {
+            if (tempDirectory != null) {
+                tempDirectory.delete();
+            }
         }
     }
 
@@ -297,6 +282,8 @@
                 FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath());
 
         workerConfig = new WorkerConfig();
+        tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
+        tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
         workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
         workerConfig.setSchedulerClassName(
                 org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
@@ -326,8 +313,6 @@
         workerConfig.setAuthenticationEnabled(true);
         workerConfig.setAuthorizationEnabled(true);
 
-        workerConfig.setConnectorsDirectory(Files.createTempDirectory("tempconnectorsdir").toFile().getAbsolutePath());
-
         PulsarWorkerService workerService = new PulsarWorkerService();
         workerService.init(workerConfig, null, false);
         return workerService;
@@ -458,11 +443,7 @@
             assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
         });
 
-        // make sure all temp files are deleted
-        File dir = new File(System.getProperty("java.io.tmpdir"));
-        File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
-
-        Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
+        tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
     }
 
     @Test(timeOut = 20000)
@@ -900,11 +881,7 @@
         // make sure subscriptions are cleanup
         assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
 
-        // make sure all temp files are deleted
-        File dir = new File(System.getProperty("java.io.tmpdir"));
-        File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
-
-        Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
+        tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
     }
 
     @Test(timeOut = 20000, groups = "builtin")
@@ -1053,11 +1030,7 @@
         assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
         assertTrue(m.value > 0.0);
 
-        // make sure all temp files are deleted
-        File dir = new File(System.getProperty("java.io.tmpdir"));
-        File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
-
-        Assert.assertEquals(Objects.requireNonNull(foundFiles).length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
+        tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
         admin.sources().deleteSource(tenant, namespacePortion, sourceName);
     }
 
@@ -1203,11 +1176,7 @@
         assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
         assertTrue(m.value > 0.0);
 
-        // make sure all temp files are deleted
-        File dir = new File(System.getProperty("java.io.tmpdir"));
-        File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
-
-        Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
+        tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
         admin.sources().deleteSource(tenant, namespacePortion, sourceName);
     }
 
@@ -1557,11 +1526,7 @@
         // make sure subscriptions are cleanup
         assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
 
-        // make sure all temp files are deleted
-        File dir = new File(System.getProperty("java.io.tmpdir"));
-        File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
-
-        Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
+        tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
     }
 
     @Test(timeOut = 20000)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index 7d1fd77..6dad9a4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -27,17 +27,17 @@
 import static org.testng.Assert.fail;
 
 import com.google.common.collect.Sets;
-
+import java.io.Closeable;
 import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.lang.reflect.Method;
-import java.net.MalformedURLException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
@@ -57,9 +57,10 @@
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
-import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
+import org.apache.pulsar.functions.worker.PulsarFunctionTestTemporaryDirectory;
 import org.apache.pulsar.functions.worker.PulsarWorkerService;
 import org.apache.pulsar.functions.worker.PulsarWorkerService.PulsarClientCreator;
 import org.apache.pulsar.functions.worker.WorkerConfig;
@@ -94,6 +95,7 @@
     private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
 
     private static final Logger log = LoggerFactory.getLogger(PulsarFunctionTlsTest.class);
+    private PulsarFunctionTestTemporaryDirectory tempDirectory;
 
     @BeforeMethod
     void setup(Method method) throws Exception {
@@ -167,15 +169,23 @@
     @AfterMethod(alwaysRun = true)
     void shutdown() throws Exception {
         log.info("--- Shutting down ---");
-        functionAdmin.close();
-        bkEnsemble.stop();
-        workerServer.stop();
-        functionsWorkerService.stop();
+        try {
+            functionAdmin.close();
+            bkEnsemble.stop();
+            workerServer.stop();
+            functionsWorkerService.stop();
+        } finally {
+            if (tempDirectory != null) {
+                tempDirectory.delete();
+            }
+        }
     }
 
     private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration config,
                                                            PulsarAdmin mockPulsarAdmin) {
         workerConfig = new WorkerConfig();
+        tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
+        tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
         workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
         workerConfig.setSchedulerClassName(
                 org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
@@ -262,9 +272,12 @@
 
         File file = new File(jarFile);
         try {
-            ClassLoaderUtils.loadJar(file);
-        } catch (MalformedURLException e) {
-            throw new RuntimeException("Failed to load user jar " + file, e);
+            ClassLoader classLoader = ClassLoaderUtils.loadJar(file);
+            if (classLoader instanceof Closeable) {
+                ((Closeable) classLoader).close();
+            }
+        } catch (IOException e) {
+            throw new UncheckedIOException("Failed to load user jar " + file, e);
         }
         String sourceTopicPattern = String.format("persistent://%s/%s/%s", tenant, namespace, sourceTopic);
 
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index 4eeeac4..16e6550 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -29,17 +29,19 @@
 
 import com.beust.jcommander.ParameterException;
 import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
-
+import java.io.Closeable;
 import java.io.File;
+import java.io.IOException;
 import java.net.URL;
 import java.nio.file.Files;
-import java.util.*;
-
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
-import org.apache.pulsar.client.admin.Sinks;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Sinks;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.functions.UpdateOptions;
@@ -50,6 +52,7 @@
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.testng.Assert;
 import org.testng.IObjectFactory;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
@@ -100,6 +103,9 @@
     private CmdSinks.UpdateSink updateSink;
     private CmdSinks.LocalSinkRunner localSinkRunner;
     private CmdSinks.DeleteSink deleteSink;
+    private ClassLoader oldContextClassLoader;
+    private ClassLoader jarClassLoader;
+
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -121,7 +127,21 @@
             throw new RuntimeException("Failed to file required test archive: " + JAR_FILE_NAME);
         }
         JAR_FILE_PATH = file.getFile();
-        Thread.currentThread().setContextClassLoader(ClassLoaderUtils.loadJar(new File(JAR_FILE_PATH)));
+        jarClassLoader = ClassLoaderUtils.loadJar(new File(JAR_FILE_PATH));
+        oldContextClassLoader = Thread.currentThread().getContextClassLoader();
+        Thread.currentThread().setContextClassLoader(jarClassLoader);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void cleanup() throws IOException {
+        if (jarClassLoader != null && jarClassLoader instanceof Closeable) {
+            ((Closeable) jarClassLoader).close();
+            jarClassLoader = null;
+        }
+        if (oldContextClassLoader != null) {
+            Thread.currentThread().setContextClassLoader(oldContextClassLoader);
+            oldContextClassLoader = null;
+        }
     }
 
     public SinkConfig getSinkConfig() {
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 4e6d95a..d9ef39d 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -29,10 +29,10 @@
 
 import com.beust.jcommander.ParameterException;
 import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
-
+import java.io.Closeable;
 import java.io.File;
+import java.io.IOException;
 import java.nio.file.Files;
-
 import org.apache.pulsar.admin.cli.utils.CmdUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.Sources;
@@ -46,6 +46,7 @@
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.testng.Assert;
 import org.testng.IObjectFactory;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
@@ -81,6 +82,8 @@
     private CmdSources.UpdateSource updateSource;
     private CmdSources.LocalSourceRunner localSourceRunner;
     private CmdSources.DeleteSource deleteSource;
+    private ClassLoader oldContextClassLoader;
+    private ClassLoader jarClassLoader;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -98,7 +101,21 @@
         mockStatic(CmdFunctions.class);
         PowerMockito.doNothing().when(localSourceRunner).runCmd();
         JAR_FILE_PATH = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME).getFile();
-        Thread.currentThread().setContextClassLoader(ClassLoaderUtils.loadJar(new File(JAR_FILE_PATH)));
+        jarClassLoader = ClassLoaderUtils.loadJar(new File(JAR_FILE_PATH));
+        oldContextClassLoader = Thread.currentThread().getContextClassLoader();
+        Thread.currentThread().setContextClassLoader(jarClassLoader);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void cleanup() throws IOException {
+        if (jarClassLoader != null && jarClassLoader instanceof Closeable) {
+            ((Closeable) jarClassLoader).close();
+            jarClassLoader = null;
+        }
+        if (oldContextClassLoader != null) {
+            Thread.currentThread().setContextClassLoader(oldContextClassLoader);
+            oldContextClassLoader = null;
+        }
     }
 
     public SourceConfig getSourceConfig() {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Reflections.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Reflections.java
index f903208..5ae1698 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Reflections.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Reflections.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.common.util;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.lang.reflect.Array;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
@@ -183,13 +184,21 @@
      * @return true if class can be loaded from jar and false if otherwise
      */
     public static boolean classExistsInJar(java.io.File jar, String fqcn) {
+        java.net.URLClassLoader loader = null;
         try {
-            java.net.URLClassLoader loader = (URLClassLoader) ClassLoaderUtils.loadJar(jar);
+            loader = (URLClassLoader) ClassLoaderUtils.loadJar(jar);
             Class.forName(fqcn, false, loader);
-            loader.close();
             return true;
         } catch (ClassNotFoundException | NoClassDefFoundError | IOException e) {
             return false;
+        } finally {
+            if (loader != null) {
+                try {
+                    loader.close();
+                } catch (IOException e) {
+                    throw new UncheckedIOException(e);
+                }
+            }
         }
     }
 
@@ -217,14 +226,22 @@
      */
     public static boolean classInJarImplementsIface(java.io.File jar, String fqcn, Class xface) {
         boolean ret = false;
+        java.net.URLClassLoader loader = null;
         try {
-            java.net.URLClassLoader loader = (URLClassLoader) ClassLoaderUtils.loadJar(jar);
+            loader = (URLClassLoader) ClassLoaderUtils.loadJar(jar);
             if (xface.isAssignableFrom(Class.forName(fqcn, false, loader))){
                 ret = true;
             }
-            loader.close();
         } catch (ClassNotFoundException | NoClassDefFoundError | IOException e) {
             throw new RuntimeException(e);
+        } finally {
+            if (loader != null) {
+                try {
+                    loader.close();
+                } catch (IOException e) {
+                    throw new UncheckedIOException(e);
+                }
+            }
         }
         return ret;
     }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 02083ed..d761ccb 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -22,17 +22,12 @@
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import io.prometheus.client.CollectorRegistry;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import net.jodah.typetools.TypeResolver;
@@ -73,8 +68,6 @@
 import org.apache.pulsar.functions.source.batch.BatchSourceExecutor;
 import org.apache.pulsar.functions.utils.CryptoUtils;
 import org.apache.pulsar.functions.utils.FunctionCommon;
-import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
-import org.apache.pulsar.functions.utils.io.Connector;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.Source;
 import org.slf4j.Logger;
@@ -367,7 +360,9 @@
         try {
             record = this.source.read();
         } catch (Exception e) {
-            stats.incrSourceExceptions(e);
+            if (stats != null) {
+                stats.incrSourceExceptions(e);
+            }
             log.error("Encountered exception in source read", e);
             throw e;
         } finally {
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 7d84bd8..fdfdd90 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions;
 
 import static org.apache.pulsar.common.functions.Utils.inferMissingArguments;
+
 import com.beust.jcommander.IStringConverter;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
@@ -28,6 +29,8 @@
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.LinkedList;
 import java.util.List;
@@ -50,7 +53,7 @@
 import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.io.SourceConfig;
-import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.nar.FileUtils;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -80,6 +83,8 @@
     private final AtomicBoolean running = new AtomicBoolean(false);
     private final List<RuntimeSpawner> spawners = new LinkedList<>();
     private final String narExtractionDirectory;
+    private final File narExtractionDirectoryCreated;
+    private final String connectorsDir;
     private final Thread shutdownHook;
     private ClassLoader userCodeClassLoader;
     private boolean userCodeClassLoaderCreated;
@@ -180,7 +185,8 @@
             stateStorageServiceUrl, String brokerServiceUrl, String clientAuthPlugin, String clientAuthParams,
                        boolean useTls, boolean tlsAllowInsecureConnection, boolean tlsHostNameVerificationEnabled,
                        String tlsTrustCertFilePath, int instanceIdOffset, RuntimeEnv runtimeEnv,
-                       String secretsProviderClassName, String secretsProviderConfig, String narExtractionDirectory) {
+                       String secretsProviderClassName, String secretsProviderConfig, String narExtractionDirectory,
+                       String connectorsDirectory) {
         this.functionConfig = functionConfig;
         this.sourceConfig = sourceConfig;
         this.sinkConfig = sinkConfig;
@@ -196,8 +202,22 @@
         this.runtimeEnv = runtimeEnv;
         this.secretsProviderClassName = secretsProviderClassName;
         this.secretsProviderConfig = secretsProviderConfig;
-        this.narExtractionDirectory = narExtractionDirectory != null ? narExtractionDirectory
-                : NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
+        if (narExtractionDirectory != null) {
+            this.narExtractionDirectoryCreated = null;
+            this.narExtractionDirectory = narExtractionDirectory;
+        } else {
+            this.narExtractionDirectoryCreated = createNarExtractionTempDirectory();
+            this.narExtractionDirectory = this.narExtractionDirectoryCreated.getAbsolutePath();
+        }
+        if (connectorsDirectory != null) {
+            this.connectorsDir = connectorsDirectory;
+        } else {
+            String pulsarHome = System.getenv("PULSAR_HOME");
+            if (pulsarHome == null) {
+                pulsarHome = Paths.get("").toAbsolutePath().toString();
+            }
+            this.connectorsDir = Paths.get(pulsarHome, "connectors").toString();
+        }
         shutdownHook = new Thread() {
             public void run() {
                 LocalRunner.this.stop();
@@ -205,9 +225,23 @@
         };
     }
 
+    private static File createNarExtractionTempDirectory() {
+        try {
+            return Files.createTempDirectory("pulsar_localrunner_nars_").toFile();
+        } catch (IOException e) {
+            throw new UncheckedIOException("Cannot create temp directory", e);
+        }
+    }
+
     @Override
     public void close() throws Exception {
-        stop();
+        try {
+            stop();
+        } finally {
+            if (narExtractionDirectoryCreated != null) {
+                FileUtils.deleteFile(narExtractionDirectoryCreated, true);
+            }
+        }
     }
 
     public synchronized void stop() {
@@ -261,7 +295,7 @@
                         String functionType = functionConfig.getJar().replaceFirst("^builtin://", "");
                         userCodeFile = functions.getFunctions().get(functionType).toString();
                     }
-                     
+
                     if (Utils.isFunctionPackageUrlSupported(userCodeFile)) {
                         File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
                         userCodeClassLoader = FunctionConfigUtils.validate(functionConfig, file);
@@ -584,12 +618,6 @@
     }
 
     private TreeMap<String, Connector> getConnectors() throws IOException {
-        // Validate the connector source type from the locally available connectors
-        String pulsarHome = System.getenv("PULSAR_HOME");
-        if (pulsarHome == null) {
-            pulsarHome = Paths.get("").toAbsolutePath().toString();
-        }
-        String connectorsDir = Paths.get(pulsarHome, "connectors").toString();
         return ConnectorUtils.searchForConnectors(connectorsDir, narExtractionDirectory);
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index db62afd..5d2f8ee 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -18,7 +18,29 @@
  */
 package org.apache.pulsar.functions.worker.rest.api;
 
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
+import static org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
+import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
+
 import com.google.protobuf.ByteString;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
@@ -45,28 +67,6 @@
 import org.apache.pulsar.packages.management.core.common.PackageType;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.UriBuilder;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Optional;
-import java.util.function.Supplier;
-
-import static org.apache.commons.lang3.StringUtils.isBlank;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
-import static org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
-import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
-
 @Slf4j
 public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWorkerService> {
 
@@ -763,8 +763,19 @@
     }
 
     private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
-        File file = Files.createTempFile("function", ".tmp").toFile();
-        worker().getBrokerAdmin().packages().download(packageName, file.toString());
+        return downloadPackageFile(worker(), packageName);
+    }
+
+    static File downloadPackageFile(PulsarWorkerService worker, String packageName) throws IOException, PulsarAdminException {
+        Path tempDirectory;
+        if (worker.getWorkerConfig().getDownloadDirectory() != null) {
+            tempDirectory = Paths.get(worker.getWorkerConfig().getDownloadDirectory());
+        } else {
+            // use the Nar extraction directory as a temporary directory for downloaded files
+            tempDirectory = Paths.get(worker.getWorkerConfig().getNarExtractionDirectory());
+        }
+        File file = Files.createTempFile(tempDirectory, "function", ".tmp").toFile();
+        worker.getBrokerAdmin().packages().download(packageName, file.toString());
         return file;
     }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index c059e18..92d52f0 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -18,7 +18,26 @@
  */
 package org.apache.pulsar.functions.worker.rest.api;
 
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
+import static org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
+import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
+
 import com.google.protobuf.ByteString;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
@@ -46,22 +65,6 @@
 import org.apache.pulsar.packages.management.core.common.PackageType;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.nio.file.Files;
-import java.util.*;
-import java.util.function.Supplier;
-
-import static org.apache.commons.lang3.StringUtils.isBlank;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
-import static org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
-import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
-
 @Slf4j
 public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerService> {
 
@@ -739,14 +742,11 @@
         return SinkConfigUtils.convert(sinkConfig, sinkDetails);
     }
 
-
     private static boolean hasPackageTypePrefix(String destPkgUrl) {
         return Arrays.stream(PackageType.values()).anyMatch(type -> destPkgUrl.startsWith(type.toString()));
     }
 
     private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
-        File file = Files.createTempFile("function", ".tmp").toFile();
-        worker().getBrokerAdmin().packages().download(packageName, file.toString());
-        return file;
+        return FunctionsImpl.downloadPackageFile(worker(), packageName);
     }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index 089b67c..147f2f7 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -18,7 +18,26 @@
  */
 package org.apache.pulsar.functions.worker.rest.api;
 
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
+import static org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
+import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
+
 import com.google.protobuf.ByteString;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
@@ -46,22 +65,6 @@
 import org.apache.pulsar.packages.management.core.common.PackageType;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.nio.file.Files;
-import java.util.*;
-import java.util.function.Supplier;
-
-import static org.apache.commons.lang3.StringUtils.isBlank;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
-import static org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
-import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
-
 @Slf4j
 public class SourcesImpl extends ComponentImpl implements Sources<PulsarWorkerService> {
 
@@ -741,8 +744,6 @@
     }
 
     private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
-        File file = Files.createTempFile("function", ".tmp").toFile();
-        worker().getBrokerAdmin().packages().download(packageName, file.toString());
-        return file;
+        return FunctionsImpl.downloadPackageFile(worker(), packageName);
     }
 }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index 41755f7..03c1a14 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -50,6 +50,7 @@
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.testng.IObjectFactory;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
@@ -122,6 +123,7 @@
     private InputStream mockedInputStream;
     private FormDataContentDisposition mockedFormData;
     private Function.FunctionMetaData mockedFunctionMetadata;
+    private PulsarFunctionTestTemporaryDirectory tempDirectory;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -185,16 +187,25 @@
         WorkerConfig workerConfig = new WorkerConfig()
                 .setWorkerId(workerId)
                 .setWorkerPort(8080)
-                .setDownloadDirectory("/tmp/pulsar/functions")
                 .setFunctionMetadataTopicName("pulsar/functions")
                 .setNumFunctionPackageReplicas(3)
                 .setPulsarServiceUrl("pulsar://localhost:6650/");
+        tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
+        tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
         this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
 
         mockStatic(InstanceUtils.class);
-        PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(Function.FunctionDetails.ComponentType.FUNCTION);    }
+        PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(Function.FunctionDetails.ComponentType.FUNCTION);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void cleanup() {
+        if (tempDirectory != null) {
+            tempDirectory.delete();
+        }
+    }
 
     @Test
     public void testStatusEmpty() {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/PulsarFunctionTestTemporaryDirectory.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/PulsarFunctionTestTemporaryDirectory.java
new file mode 100644
index 0000000..5f403dd
--- /dev/null
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/PulsarFunctionTestTemporaryDirectory.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+import org.apache.commons.io.FileUtils;
+import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.testng.Assert;
+
+/**
+ * Creates a temporary directory that contains 3 subdirectories,
+ * "narExtractionDirectory", "downloadDirectory" and "connectorsDirectory",
+ * which are assigned to the provided workerConfig's respective settings with
+ * the {@link #useTemporaryDirectoriesForWorkerConfig(WorkerConfig)} method
+ */
+public class PulsarFunctionTestTemporaryDirectory {
+    private final File tempDirectory;
+    private final File narExtractionDirectory;
+    private final File downloadDirectory;
+    private final File connectorsDirectory;
+
+    private PulsarFunctionTestTemporaryDirectory(String tempDirectoryNamePrefix) throws IOException {
+        tempDirectory = Files.createTempDirectory(tempDirectoryNamePrefix).toFile();
+        narExtractionDirectory = new File(tempDirectory, "narExtractionDirectory");
+        narExtractionDirectory.mkdir();
+        downloadDirectory = new File(tempDirectory, "downloadDirectory");
+        downloadDirectory.mkdir();
+        connectorsDirectory = new File(tempDirectory, "connectorsDirectory");
+        connectorsDirectory.mkdir();
+    }
+
+    public static PulsarFunctionTestTemporaryDirectory create(String tempDirectoryNamePrefix) {
+        try {
+            return new PulsarFunctionTestTemporaryDirectory(tempDirectoryNamePrefix);
+        } catch (IOException e) {
+            throw new UncheckedIOException("Cannot create temporary directory", e);
+        }
+    }
+
+    public void useTemporaryDirectoriesForWorkerConfig(WorkerConfig workerConfig) {
+        workerConfig.setNarExtractionDirectory(narExtractionDirectory.getAbsolutePath());
+        workerConfig.setDownloadDirectory(downloadDirectory.getAbsolutePath());
+        workerConfig.setConnectorsDirectory(connectorsDirectory.getAbsolutePath());
+    }
+
+    public void delete() {
+        try {
+            FileUtils.deleteDirectory(tempDirectory);
+        } catch (IOException e) {
+            throw new UncheckedIOException("Cannot delete temporary directory", e);
+        }
+    }
+
+    public void assertThatFunctionDownloadTempFilesHaveBeenDeleted() {
+        // make sure all temp files are deleted
+        File[] foundFiles = downloadDirectory.listFiles((dir1, name) -> name.startsWith("function"));
+        Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: "
+                + Arrays.asList(foundFiles));
+    }
+}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index e040334..29fb11c 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -85,12 +85,14 @@
 import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImplV2;
+import org.apache.pulsar.functions.worker.rest.api.PulsarFunctionTestTemporaryDirectory;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.testng.Assert;
 import org.testng.IObjectFactory;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
@@ -144,6 +146,7 @@
     private FormDataContentDisposition mockedFormData;
     private FunctionMetaData mockedFunctionMetadata;
     private LeaderService mockedLeaderService;
+    private PulsarFunctionTestTemporaryDirectory tempDirectory;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -184,10 +187,11 @@
         WorkerConfig workerConfig = new WorkerConfig()
                 .setWorkerId("test")
                 .setWorkerPort(8080)
-                .setDownloadDirectory("/tmp/pulsar/functions")
                 .setFunctionMetadataTopicName("pulsar/functions")
                 .setNumFunctionPackageReplicas(3)
                 .setPulsarServiceUrl("pulsar://localhost:6650/");
+        tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
+        tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
         FunctionsImpl functions = spy(new FunctionsImpl(() -> mockedWorkerService));
@@ -199,6 +203,13 @@
 
     }
 
+    @AfterMethod(alwaysRun = true)
+    public void cleanup() {
+        if (tempDirectory != null) {
+            tempDirectory.delete();
+        }
+    }
+
     //
     // Register Functions
     //
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 3d356e8..27694db 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -83,6 +83,7 @@
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerUtils;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
+import org.apache.pulsar.functions.worker.rest.api.PulsarFunctionTestTemporaryDirectory;
 import org.apache.pulsar.functions.worker.rest.api.v2.FunctionsApiV2Resource;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.mockito.Mockito;
@@ -91,6 +92,7 @@
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.testng.Assert;
 import org.testng.IObjectFactory;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
@@ -152,6 +154,7 @@
     private FunctionMetaData mockedFunctionMetadata;
     private LeaderService mockedLeaderService;
     private Packages mockedPackages;
+    private PulsarFunctionTestTemporaryDirectory tempDirectory;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -195,10 +198,11 @@
         WorkerConfig workerConfig = new WorkerConfig()
             .setWorkerId("test")
             .setWorkerPort(8080)
-            .setDownloadDirectory("/tmp/pulsar/functions")
             .setFunctionMetadataTopicName("pulsar/functions")
             .setNumFunctionPackageReplicas(3)
             .setPulsarServiceUrl("pulsar://localhost:6650/");
+        tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
+        tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
         this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
@@ -206,6 +210,13 @@
         PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(FunctionDetails.ComponentType.FUNCTION);
     }
 
+    @AfterMethod(alwaysRun = true)
+    public void cleanup() {
+        if (tempDirectory != null) {
+            tempDirectory.delete();
+        }
+    }
+
     //
     // Register Functions
     //
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 473b25e..97b2d96 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -78,6 +78,7 @@
 import org.apache.pulsar.functions.worker.PulsarWorkerService;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerUtils;
+import org.apache.pulsar.functions.worker.rest.api.PulsarFunctionTestTemporaryDirectory;
 import org.apache.pulsar.functions.worker.rest.api.SinksImpl;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.mockito.Mockito;
@@ -85,6 +86,7 @@
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.testng.IObjectFactory;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
@@ -134,6 +136,7 @@
     private FunctionMetaData mockedFunctionMetaData;
     private LeaderService mockedLeaderService;
     private Packages mockedPackages;
+    private PulsarFunctionTestTemporaryDirectory tempDirectory;
 
     private static final String SYSTEM_PROPERTY_NAME_CASSANDRA_NAR_FILE_PATH = "pulsar-io-cassandra.nar.path";
 
@@ -203,10 +206,11 @@
         WorkerConfig workerConfig = new WorkerConfig()
                 .setWorkerId("test")
                 .setWorkerPort(8080)
-                .setDownloadDirectory("/tmp/pulsar/functions")
                 .setFunctionMetadataTopicName("pulsar/functions")
                 .setNumFunctionPackageReplicas(3)
                 .setPulsarServiceUrl("pulsar://localhost:6650/");
+        tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
+        tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
         this.resource = spy(new SinksImpl(() -> mockedWorkerService));
@@ -214,6 +218,13 @@
         PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(FunctionDetails.ComponentType.SINK);
     }
 
+    @AfterMethod(alwaysRun = true)
+    public void cleanup() {
+        if (tempDirectory != null) {
+            tempDirectory.delete();
+        }
+    }
+
     //
     // Register Functions
     //
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index cd85ec6..5bd07da 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -81,6 +81,7 @@
 import org.apache.pulsar.functions.worker.PulsarWorkerService;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerUtils;
+import org.apache.pulsar.functions.worker.rest.api.PulsarFunctionTestTemporaryDirectory;
 import org.apache.pulsar.functions.worker.rest.api.SourcesImpl;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.mockito.Mockito;
@@ -89,6 +90,7 @@
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.testng.IObjectFactory;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.ObjectFactory;
@@ -132,6 +134,7 @@
     private FunctionMetaData mockedFunctionMetaData;
     private LeaderService mockedLeaderService;
     private Packages mockedPackages;
+    private PulsarFunctionTestTemporaryDirectory tempDirectory;
 
     private static NarClassLoader narClassLoader;
 
@@ -192,10 +195,11 @@
         WorkerConfig workerConfig = new WorkerConfig()
                 .setWorkerId("test")
                 .setWorkerPort(8080)
-                .setDownloadDirectory("/tmp/pulsar/functions")
                 .setFunctionMetadataTopicName("pulsar/functions")
                 .setNumFunctionPackageReplicas(3)
                 .setPulsarServiceUrl("pulsar://localhost:6650/");
+        tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
+        tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
         this.resource = spy(new SourcesImpl(() -> mockedWorkerService));
@@ -203,6 +207,13 @@
         PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(FunctionDetails.ComponentType.SOURCE);
     }
 
+    @AfterMethod(alwaysRun = true)
+    public void cleanup() {
+        if (tempDirectory != null) {
+            tempDirectory.delete();
+        }
+    }
+
     //
     // Register Functions
     //