[Testing] Improve Functions unit tests by using unique nar extraction directory (#10032)
- Use unique temporary nar extraction directory for each test
- This fixes possible issues caused by running tests with testForkCount > 1 setting (current status in master branch)
- Delete directory consistently in AfterMethod
- Add feature LocalRunner to create a unique temp directory for extracting
nar files. Delete directory on close
- Fix some ClassLoader resource cleanup issues that came up while working on the changes
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/FileServer.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/FileServer.java
index 6d1fbdb..ce3d770 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/FileServer.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/FileServer.java
@@ -18,30 +18,40 @@
*/
package org.apache.pulsar.functions.worker;
-import com.google.api.Http;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpServer;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
+import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
+import java.net.URL;
import java.nio.file.Files;
import lombok.extern.slf4j.Slf4j;
+import org.awaitility.Awaitility;
/**
* Simple http server for serving files in Pulsar Function test cases
*/
@Slf4j
public class FileServer implements AutoCloseable {
+ private static final String HEALTH_PATH = "/health";
private final HttpServer httpServer;
public FileServer() throws IOException {
httpServer = HttpServer.create(new InetSocketAddress(0), 0);
// creates a default executor
httpServer.setExecutor(null);
+ httpServer.createContext(HEALTH_PATH, he -> {
+ he.sendResponseHeaders(204, 0);
+ });
}
public void serveFile(String path, File file) {
+ assertTrue(file.exists(), file.getAbsolutePath() + " doesn't exist.");
httpServer.createContext(path, he -> {
try {
Headers headers = he.getResponseHeaders();
@@ -59,6 +69,29 @@
public void start() {
httpServer.start();
+ waitUntilServerIsAvailable();
+ }
+
+ private void waitUntilServerIsAvailable() {
+ // wait until server is available.
+ // There has been a few flakiness issues where the server hasn't been available when
+ // the system-under-test has started to download files
+ // this assertion will call the "/health" endpoint and check that 204 status code is returned.
+ Awaitility.await()
+ .ignoreExceptions()
+ .untilAsserted(() -> {
+ HttpURLConnection urlConnection = (HttpURLConnection) new URL(getUrl(HEALTH_PATH))
+ .openConnection();
+ urlConnection.setUseCaches(false);
+ urlConnection.setConnectTimeout(5000);
+ urlConnection.setReadTimeout(5000);
+ try {
+ urlConnection.connect();
+ assertEquals(urlConnection.getResponseCode(), 204);
+ } finally {
+ urlConnection.disconnect();
+ }
+ });
}
public void stop() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
index 9d7a796..fb3d1ca 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
@@ -30,9 +30,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-
import io.jsonwebtoken.SignatureAlgorithm;
-
import java.lang.reflect.Method;
import java.net.URL;
import java.util.Collections;
@@ -42,9 +40,7 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-
import javax.crypto.SecretKey;
-
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -63,7 +59,6 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
@@ -112,6 +107,7 @@
private static final Logger log = LoggerFactory.getLogger(PulsarFunctionE2ETest.class);
private String adminToken;
private String brokerServiceUrl;
+ private PulsarFunctionTestTemporaryDirectory tempDirectory;
@DataProvider(name = "validRoleName")
public Object[][] validRoleName() {
@@ -210,12 +206,18 @@
@AfterMethod(alwaysRun = true)
void shutdown() throws Exception {
- log.info("--- Shutting down ---");
- pulsarClient.close();
- superUserAdmin.close();
- functionsWorkerService.stop();
- pulsar.close();
- bkEnsemble.stop();
+ try {
+ log.info("--- Shutting down ---");
+ pulsarClient.close();
+ superUserAdmin.close();
+ functionsWorkerService.stop();
+ pulsar.close();
+ bkEnsemble.stop();
+ } finally {
+ if (tempDirectory != null) {
+ tempDirectory.delete();
+ }
+ }
}
private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
@@ -224,6 +226,8 @@
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath());
workerConfig = new WorkerConfig();
+ tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
+ tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
workerConfig.setSchedulerClassName(
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index f24a520..fca0298 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -22,15 +22,14 @@
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
import static org.mockito.Mockito.spy;
-
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
@@ -48,7 +47,6 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
-import org.apache.commons.io.FileUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -122,6 +120,7 @@
private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
private static final String SYSTEM_PROPERTY_NAME_NAR_FILE_PATH = "pulsar-io-data-generator.nar.path";
+ private PulsarFunctionTestTemporaryDirectory tempDirectory;
public static File getPulsarIODataGeneratorNar() {
return new File(Objects.requireNonNull(System.getProperty(SYSTEM_PROPERTY_NAME_NAR_FILE_PATH)
@@ -177,15 +176,6 @@
@BeforeMethod
void setup(Method method) throws Exception {
-
- // delete all function temp files
- File dir = new File(System.getProperty("java.io.tmpdir"));
- File[] foundFiles = dir.listFiles((ignoredDir, name) -> name.startsWith("function"));
-
- for (File file : foundFiles) {
- file.delete();
- }
-
log.info("--- Setting up method {} ---", method.getName());
// Start local bookkeeper ensemble
@@ -230,16 +220,8 @@
if (Arrays.asList(method.getAnnotation(Test.class).groups()).contains("builtin")) {
File connectorsDir = new File(workerConfig.getConnectorsDirectory());
- if (connectorsDir.exists()) {
- FileUtils.deleteDirectory(connectorsDir);
- }
-
- if (connectorsDir.mkdir()) {
- File file = getPulsarIODataGeneratorNar();
- Files.copy(file.toPath(), new File(connectorsDir.getAbsolutePath() + "/" + file.getName()).toPath());
- } else {
- throw new RuntimeException("Failed to create builtin connectors directory");
- }
+ File file = getPulsarIODataGeneratorNar();
+ Files.copy(file.toPath(), new File(connectorsDir, file.getName()).toPath());
}
Optional<WorkerService> functionWorkerService = Optional.empty();
@@ -292,16 +274,17 @@
@AfterMethod(alwaysRun = true)
void shutdown() throws Exception {
- log.info("--- Shutting down ---");
- fileServer.stop();
- pulsarClient.close();
- admin.close();
- pulsar.close();
- bkEnsemble.stop();
-
- File connectorsDir = new File(workerConfig.getConnectorsDirectory());
- if (connectorsDir.exists()) {
- FileUtils.deleteDirectory(connectorsDir);
+ try {
+ log.info("--- Shutting down ---");
+ fileServer.stop();
+ pulsarClient.close();
+ admin.close();
+ pulsar.close();
+ bkEnsemble.stop();
+ } finally {
+ if (tempDirectory != null) {
+ tempDirectory.delete();
+ }
}
}
@@ -311,6 +294,8 @@
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath());
WorkerConfig workerConfig = new WorkerConfig();
+ tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
+ tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
workerConfig.setSchedulerClassName(
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
@@ -681,7 +666,9 @@
.tlsTrustCertFilePath(TLS_TRUST_CERT_FILE_PATH)
.tlsAllowInsecureConnection(true)
.tlsHostNameVerificationEnabled(false)
- .brokerServiceUrl(pulsar.getBrokerServiceUrlTls()).build();
+ .brokerServiceUrl(pulsar.getBrokerServiceUrlTls())
+ .connectorsDirectory(workerConfig.getConnectorsDirectory())
+ .build();
localRunner.start(false);
@@ -793,7 +780,9 @@
.tlsTrustCertFilePath(TLS_TRUST_CERT_FILE_PATH)
.tlsAllowInsecureConnection(true)
.tlsHostNameVerificationEnabled(false)
- .brokerServiceUrl(pulsar.getBrokerServiceUrlTls()).build();
+ .brokerServiceUrl(pulsar.getBrokerServiceUrlTls())
+ .connectorsDirectory(workerConfig.getConnectorsDirectory())
+ .build();
localRunner.start(false);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index 0bf0886..d7597d8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -28,6 +28,19 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import java.io.File;
+import java.lang.reflect.Method;
+import java.net.URI;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.namespace.Namespace;
@@ -50,7 +63,6 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
@@ -66,21 +78,6 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import java.io.File;
-import java.lang.reflect.Method;
-import java.net.URI;
-import java.net.URL;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
/**
* Test Pulsar function state
*/
@@ -107,6 +104,7 @@
private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
+ private PulsarFunctionTestTemporaryDirectory tempDirectory;
@DataProvider(name = "validRoleName")
public Object[][] validRoleName() {
@@ -115,15 +113,6 @@
@BeforeMethod
void setup(Method method) throws Exception {
-
- // delete all function temp files
- File dir = new File(System.getProperty("java.io.tmpdir"));
- File[] foundFiles = dir.listFiles((ignoredDir, name) -> name.startsWith("function"));
-
- for (File file : foundFiles) {
- file.delete();
- }
-
log.info("--- Setting up method {} ---", method.getName());
// Start local bookkeeper ensemble
@@ -214,17 +203,25 @@
@AfterMethod(alwaysRun = true)
void shutdown() throws Exception {
- log.info("--- Shutting down ---");
- pulsarClient.close();
- admin.close();
- functionsWorkerService.stop();
- pulsar.close();
- bkEnsemble.stop();
+ try {
+ log.info("--- Shutting down ---");
+ pulsarClient.close();
+ admin.close();
+ functionsWorkerService.stop();
+ pulsar.close();
+ bkEnsemble.stop();
+ } finally {
+ if (tempDirectory != null) {
+ tempDirectory.delete();
+ }
+ }
}
private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
workerConfig = new WorkerConfig();
+ tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
+ tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
workerConfig.setSchedulerClassName(
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
@@ -366,11 +363,7 @@
// make sure subscriptions are cleanup
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
- // make sure all temp files are deleted
- File dir = new File(System.getProperty("java.io.tmpdir"));
- File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
-
- Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
+ tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
}
@Test
@@ -507,11 +500,7 @@
// make sure subscriptions are cleanup
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
- // make sure all temp files are deleted
- File dir = new File(System.getProperty("java.io.tmpdir"));
- File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
-
- Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
+ tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
DistributedLogConfiguration dlogConf = WorkerUtils.getDlogConf(workerConfig);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestTemporaryDirectory.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestTemporaryDirectory.java
new file mode 100644
index 0000000..b97d2a1
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTestTemporaryDirectory.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+import org.apache.commons.io.FileUtils;
+import org.testng.Assert;
+
+/**
+ * Creates a temporary directory that contains 3 subdirectories,
+ * "narExtractionDirectory", "downloadDirectory" and "connectorsDirectory",
+ * which are assigned to the provided workerConfig's respective settings with
+ * the {@link #useTemporaryDirectoriesForWorkerConfig(WorkerConfig)} method
+ */
+public class PulsarFunctionTestTemporaryDirectory {
+ private final File tempDirectory;
+ private final File narExtractionDirectory;
+ private final File downloadDirectory;
+ private final File connectorsDirectory;
+
+ private PulsarFunctionTestTemporaryDirectory(String tempDirectoryNamePrefix) throws IOException {
+ tempDirectory = Files.createTempDirectory(tempDirectoryNamePrefix).toFile();
+ narExtractionDirectory = new File(tempDirectory, "narExtractionDirectory");
+ narExtractionDirectory.mkdir();
+ downloadDirectory = new File(tempDirectory, "downloadDirectory");
+ downloadDirectory.mkdir();
+ connectorsDirectory = new File(tempDirectory, "connectorsDirectory");
+ connectorsDirectory.mkdir();
+ }
+
+ public static PulsarFunctionTestTemporaryDirectory create(String tempDirectoryNamePrefix) {
+ try {
+ return new PulsarFunctionTestTemporaryDirectory(tempDirectoryNamePrefix);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Cannot create temporary directory", e);
+ }
+ }
+
+ public void useTemporaryDirectoriesForWorkerConfig(WorkerConfig workerConfig) {
+ workerConfig.setNarExtractionDirectory(narExtractionDirectory.getAbsolutePath());
+ workerConfig.setDownloadDirectory(downloadDirectory.getAbsolutePath());
+ workerConfig.setConnectorsDirectory(connectorsDirectory.getAbsolutePath());
+ }
+
+ public void delete() {
+ try {
+ FileUtils.deleteDirectory(tempDirectory);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Cannot delete temporary directory", e);
+ }
+ }
+
+ public void assertThatFunctionDownloadTempFilesHaveBeenDeleted() {
+ // make sure all temp files are deleted
+ File[] foundFiles = downloadDirectory.listFiles((dir1, name) -> name.startsWith("function"));
+ Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: "
+ + Arrays.asList(foundFiles));
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
index 54c24a7..61e7e61 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionTlsTest.java
@@ -18,11 +18,15 @@
*/
package org.apache.pulsar.functions.worker;
+import static org.testng.Assert.assertEquals;
+
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
+import java.io.Closeable;
import java.io.File;
-import java.net.MalformedURLException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -50,8 +54,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import static org.testng.Assert.assertEquals;
-
@Slf4j
@Test(groups = "functions-worker")
public class PulsarFunctionTlsTest {
@@ -72,6 +74,7 @@
protected String testCluster = "my-cluster";
protected String testTenant = "my-tenant";
protected String testNamespace = testTenant + "/my-ns";
+ private PulsarFunctionTestTemporaryDirectory[] tempDirectories = new PulsarFunctionTestTemporaryDirectory[BROKER_COUNT];
@BeforeMethod
void setup() throws Exception {
@@ -114,6 +117,8 @@
config.setTlsEnabled(true);
WorkerConfig workerConfig = PulsarService.initializeWorkerConfigFromBrokerConfig(config, null);
+ tempDirectories[i] = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
+ tempDirectories[i].useTemporaryDirectoriesForWorkerConfig(workerConfig);
workerConfig.setPulsarFunctionsNamespace("public/functions");
workerConfig.setPulsarFunctionsCluster("my-cluster");
workerConfig.setSchedulerClassName(
@@ -169,15 +174,23 @@
@AfterMethod(alwaysRun = true)
void tearDown() throws Exception {
- for (int i = 0; i < BROKER_COUNT; i++) {
- if (pulsarServices[i] != null) {
- pulsarServices[i].close();
+ try {
+ for (int i = 0; i < BROKER_COUNT; i++) {
+ if (pulsarServices[i] != null) {
+ pulsarServices[i].close();
+ }
+ if (pulsarAdmins[i] != null) {
+ pulsarAdmins[i].close();
+ }
}
- if (pulsarAdmins[i] != null) {
- pulsarAdmins[i].close();
+ bkEnsemble.stop();
+ } finally {
+ for (int i = 0; i < BROKER_COUNT; i++) {
+ if (tempDirectories[i] != null) {
+ tempDirectories[i].delete();
+ }
}
}
- bkEnsemble.stop();
}
@Test
@@ -217,9 +230,12 @@
) throws JsonProcessingException {
File file = new File(jarFile);
try {
- ClassLoaderUtils.loadJar(file);
- } catch (MalformedURLException e) {
- throw new RuntimeException("Failed to load user jar " + file, e);
+ ClassLoader classLoader = ClassLoaderUtils.loadJar(file);
+ if (classLoader instanceof Closeable) {
+ ((Closeable) classLoader).close();
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to load user jar " + file, e);
}
String sourceTopicPattern = String.format("persistent://%s/%s/%s", tenant, namespace, sourceTopic);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index c824b79..7076b38 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -26,15 +26,12 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
-
import java.lang.reflect.Method;
import java.net.URI;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-
import lombok.extern.slf4j.Slf4j;
-
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -47,7 +44,6 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -79,6 +75,7 @@
final String pulsarFunctionsNamespace = tenant + "/pulsar-function-admin";
String primaryHost;
String workerId;
+ private PulsarFunctionTestTemporaryDirectory tempDirectory;
@BeforeMethod(timeOut = 60000)
void setup(Method method) throws Exception {
@@ -135,11 +132,17 @@
bkEnsemble.stop();
} catch (Exception e) {
log.warn("Encountered errors at shutting down PulsarWorkerAssignmentTest", e);
+ } finally {
+ if (tempDirectory != null) {
+ tempDirectory.delete();
+ }
}
}
private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
workerConfig = new WorkerConfig();
+ tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
+ tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
workerConfig.setSchedulerClassName(
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index ece1b2f..f8d7a7e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -33,6 +33,7 @@
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -49,7 +50,6 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
@@ -60,7 +60,6 @@
import java.util.regex.Pattern;
import lombok.Cleanup;
import lombok.ToString;
-import org.apache.commons.io.FileUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -101,6 +100,7 @@
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.FileServer;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
+import org.apache.pulsar.functions.worker.PulsarFunctionTestTemporaryDirectory;
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
@@ -133,6 +133,7 @@
String pulsarFunctionsNamespace = tenant + "/pulsar-function-admin";
String primaryHost;
String workerId;
+ PulsarFunctionTestTemporaryDirectory tempDirectory;
private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
@@ -150,15 +151,6 @@
@BeforeMethod
void setup(Method method) throws Exception {
-
- // delete all function temp files
- File dir = new File(System.getProperty("java.io.tmpdir"));
- File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
-
- for (File file : foundFiles) {
- file.delete();
- }
-
log.info("--- Setting up method {} ---", method.getName());
// Start local bookkeeper ensemble
@@ -206,19 +198,11 @@
if (Arrays.asList(method.getAnnotation(Test.class).groups()).contains("builtin")) {
File connectorsDir = new File(workerConfig.getConnectorsDirectory());
- if (connectorsDir.exists()) {
- FileUtils.deleteDirectory(connectorsDir);
- }
+ File file = getPulsarIODataGeneratorNar();
+ Files.copy(file.toPath(), new File(connectorsDir, file.getName()).toPath());
- if (connectorsDir.mkdir()) {
- File file = getPulsarIODataGeneratorNar();
- Files.copy(file.toPath(), new File(connectorsDir.getAbsolutePath() + "/" + file.getName()).toPath());
-
- file = getPulsarIOBatchDataGeneratorNar();
- Files.copy(file.toPath(), new File(connectorsDir.getAbsolutePath() + "/" + file.getName()).toPath());
- } else {
- throw new RuntimeException("Failed to create builtin connectors directory");
- }
+ file = getPulsarIOBatchDataGeneratorNar();
+ Files.copy(file.toPath(), new File(connectorsDir, file.getName()).toPath());
}
Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
@@ -278,16 +262,17 @@
@AfterMethod(alwaysRun = true)
void shutdown() throws Exception {
log.info("--- Shutting down ---");
- fileServer.stop();
- pulsarClient.close();
- admin.close();
- functionsWorkerService.stop();
- pulsar.close();
- bkEnsemble.stop();
-
- File connectorsDir = new File(workerConfig.getConnectorsDirectory());
- if (connectorsDir.exists()) {
- FileUtils.deleteDirectory(connectorsDir);
+ try {
+ fileServer.stop();
+ pulsarClient.close();
+ admin.close();
+ functionsWorkerService.stop();
+ pulsar.close();
+ bkEnsemble.stop();
+ } finally {
+ if (tempDirectory != null) {
+ tempDirectory.delete();
+ }
}
}
@@ -297,6 +282,8 @@
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath());
workerConfig = new WorkerConfig();
+ tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
+ tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
workerConfig.setSchedulerClassName(
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
@@ -326,8 +313,6 @@
workerConfig.setAuthenticationEnabled(true);
workerConfig.setAuthorizationEnabled(true);
- workerConfig.setConnectorsDirectory(Files.createTempDirectory("tempconnectorsdir").toFile().getAbsolutePath());
-
PulsarWorkerService workerService = new PulsarWorkerService();
workerService.init(workerConfig, null, false);
return workerService;
@@ -458,11 +443,7 @@
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
});
- // make sure all temp files are deleted
- File dir = new File(System.getProperty("java.io.tmpdir"));
- File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
-
- Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
+ tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
}
@Test(timeOut = 20000)
@@ -900,11 +881,7 @@
// make sure subscriptions are cleanup
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
- // make sure all temp files are deleted
- File dir = new File(System.getProperty("java.io.tmpdir"));
- File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
-
- Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
+ tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
}
@Test(timeOut = 20000, groups = "builtin")
@@ -1053,11 +1030,7 @@
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
assertTrue(m.value > 0.0);
- // make sure all temp files are deleted
- File dir = new File(System.getProperty("java.io.tmpdir"));
- File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
-
- Assert.assertEquals(Objects.requireNonNull(foundFiles).length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
+ tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
admin.sources().deleteSource(tenant, namespacePortion, sourceName);
}
@@ -1203,11 +1176,7 @@
assertEquals(m.tags.get("fqfn"), FunctionCommon.getFullyQualifiedName(tenant, namespacePortion, sourceName));
assertTrue(m.value > 0.0);
- // make sure all temp files are deleted
- File dir = new File(System.getProperty("java.io.tmpdir"));
- File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
-
- Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
+ tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
admin.sources().deleteSource(tenant, namespacePortion, sourceName);
}
@@ -1557,11 +1526,7 @@
// make sure subscriptions are cleanup
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
- // make sure all temp files are deleted
- File dir = new File(System.getProperty("java.io.tmpdir"));
- File[] foundFiles = dir.listFiles((dir1, name) -> name.startsWith("function"));
-
- Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
+ tempDirectory.assertThatFunctionDownloadTempFilesHaveBeenDeleted();
}
@Test(timeOut = 20000)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index 7d1fd77..6dad9a4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -27,17 +27,17 @@
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
-
+import java.io.Closeable;
import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.lang.reflect.Method;
-import java.net.MalformedURLException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
@@ -57,9 +57,10 @@
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
-import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
+import org.apache.pulsar.functions.worker.PulsarFunctionTestTemporaryDirectory;
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.PulsarWorkerService.PulsarClientCreator;
import org.apache.pulsar.functions.worker.WorkerConfig;
@@ -94,6 +95,7 @@
private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
private static final Logger log = LoggerFactory.getLogger(PulsarFunctionTlsTest.class);
+ private PulsarFunctionTestTemporaryDirectory tempDirectory;
@BeforeMethod
void setup(Method method) throws Exception {
@@ -167,15 +169,23 @@
@AfterMethod(alwaysRun = true)
void shutdown() throws Exception {
log.info("--- Shutting down ---");
- functionAdmin.close();
- bkEnsemble.stop();
- workerServer.stop();
- functionsWorkerService.stop();
+ try {
+ functionAdmin.close();
+ bkEnsemble.stop();
+ workerServer.stop();
+ functionsWorkerService.stop();
+ } finally {
+ if (tempDirectory != null) {
+ tempDirectory.delete();
+ }
+ }
}
private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration config,
PulsarAdmin mockPulsarAdmin) {
workerConfig = new WorkerConfig();
+ tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
+ tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
workerConfig.setSchedulerClassName(
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
@@ -262,9 +272,12 @@
File file = new File(jarFile);
try {
- ClassLoaderUtils.loadJar(file);
- } catch (MalformedURLException e) {
- throw new RuntimeException("Failed to load user jar " + file, e);
+ ClassLoader classLoader = ClassLoaderUtils.loadJar(file);
+ if (classLoader instanceof Closeable) {
+ ((Closeable) classLoader).close();
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException("Failed to load user jar " + file, e);
}
String sourceTopicPattern = String.format("persistent://%s/%s/%s", tenant, namespace, sourceTopic);
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index 4eeeac4..16e6550 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -29,17 +29,19 @@
import com.beust.jcommander.ParameterException;
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
-
+import java.io.Closeable;
import java.io.File;
+import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
-import java.util.*;
-
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import lombok.extern.slf4j.Slf4j;
-
import org.apache.pulsar.admin.cli.utils.CmdUtils;
-import org.apache.pulsar.client.admin.Sinks;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.Sinks;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.UpdateOptions;
@@ -50,6 +52,7 @@
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.Assert;
import org.testng.IObjectFactory;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
@@ -100,6 +103,9 @@
private CmdSinks.UpdateSink updateSink;
private CmdSinks.LocalSinkRunner localSinkRunner;
private CmdSinks.DeleteSink deleteSink;
+ private ClassLoader oldContextClassLoader;
+ private ClassLoader jarClassLoader;
+
@BeforeMethod
public void setup() throws Exception {
@@ -121,7 +127,21 @@
throw new RuntimeException("Failed to file required test archive: " + JAR_FILE_NAME);
}
JAR_FILE_PATH = file.getFile();
- Thread.currentThread().setContextClassLoader(ClassLoaderUtils.loadJar(new File(JAR_FILE_PATH)));
+ jarClassLoader = ClassLoaderUtils.loadJar(new File(JAR_FILE_PATH));
+ oldContextClassLoader = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(jarClassLoader);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void cleanup() throws IOException {
+ if (jarClassLoader != null && jarClassLoader instanceof Closeable) {
+ ((Closeable) jarClassLoader).close();
+ jarClassLoader = null;
+ }
+ if (oldContextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(oldContextClassLoader);
+ oldContextClassLoader = null;
+ }
}
public SinkConfig getSinkConfig() {
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 4e6d95a..d9ef39d 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -29,10 +29,10 @@
import com.beust.jcommander.ParameterException;
import com.fasterxml.jackson.dataformat.yaml.YAMLMapper;
-
+import java.io.Closeable;
import java.io.File;
+import java.io.IOException;
import java.nio.file.Files;
-
import org.apache.pulsar.admin.cli.utils.CmdUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.Sources;
@@ -46,6 +46,7 @@
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.Assert;
import org.testng.IObjectFactory;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
@@ -81,6 +82,8 @@
private CmdSources.UpdateSource updateSource;
private CmdSources.LocalSourceRunner localSourceRunner;
private CmdSources.DeleteSource deleteSource;
+ private ClassLoader oldContextClassLoader;
+ private ClassLoader jarClassLoader;
@BeforeMethod
public void setup() throws Exception {
@@ -98,7 +101,21 @@
mockStatic(CmdFunctions.class);
PowerMockito.doNothing().when(localSourceRunner).runCmd();
JAR_FILE_PATH = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME).getFile();
- Thread.currentThread().setContextClassLoader(ClassLoaderUtils.loadJar(new File(JAR_FILE_PATH)));
+ jarClassLoader = ClassLoaderUtils.loadJar(new File(JAR_FILE_PATH));
+ oldContextClassLoader = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(jarClassLoader);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void cleanup() throws IOException {
+ if (jarClassLoader != null && jarClassLoader instanceof Closeable) {
+ ((Closeable) jarClassLoader).close();
+ jarClassLoader = null;
+ }
+ if (oldContextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(oldContextClassLoader);
+ oldContextClassLoader = null;
+ }
}
public SourceConfig getSourceConfig() {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Reflections.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Reflections.java
index f903208..5ae1698 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/Reflections.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Reflections.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.common.util;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.lang.reflect.Array;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
@@ -183,13 +184,21 @@
* @return true if class can be loaded from jar and false if otherwise
*/
public static boolean classExistsInJar(java.io.File jar, String fqcn) {
+ java.net.URLClassLoader loader = null;
try {
- java.net.URLClassLoader loader = (URLClassLoader) ClassLoaderUtils.loadJar(jar);
+ loader = (URLClassLoader) ClassLoaderUtils.loadJar(jar);
Class.forName(fqcn, false, loader);
- loader.close();
return true;
} catch (ClassNotFoundException | NoClassDefFoundError | IOException e) {
return false;
+ } finally {
+ if (loader != null) {
+ try {
+ loader.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
}
}
@@ -217,14 +226,22 @@
*/
public static boolean classInJarImplementsIface(java.io.File jar, String fqcn, Class xface) {
boolean ret = false;
+ java.net.URLClassLoader loader = null;
try {
- java.net.URLClassLoader loader = (URLClassLoader) ClassLoaderUtils.loadJar(jar);
+ loader = (URLClassLoader) ClassLoaderUtils.loadJar(jar);
if (xface.isAssignableFrom(Class.forName(fqcn, false, loader))){
ret = true;
}
- loader.close();
} catch (ClassNotFoundException | NoClassDefFoundError | IOException e) {
throw new RuntimeException(e);
+ } finally {
+ if (loader != null) {
+ try {
+ loader.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
}
return ret;
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 02083ed..d761ccb 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -22,17 +22,12 @@
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.prometheus.client.CollectorRegistry;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import net.jodah.typetools.TypeResolver;
@@ -73,8 +68,6 @@
import org.apache.pulsar.functions.source.batch.BatchSourceExecutor;
import org.apache.pulsar.functions.utils.CryptoUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
-import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
-import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
import org.slf4j.Logger;
@@ -367,7 +360,9 @@
try {
record = this.source.read();
} catch (Exception e) {
- stats.incrSourceExceptions(e);
+ if (stats != null) {
+ stats.incrSourceExceptions(e);
+ }
log.error("Encountered exception in source read", e);
throw e;
} finally {
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 7d84bd8..fdfdd90 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.functions;
import static org.apache.pulsar.common.functions.Utils.inferMissingArguments;
+
import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
@@ -28,6 +29,8 @@
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.LinkedList;
import java.util.List;
@@ -50,7 +53,7 @@
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
-import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.nar.FileUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -80,6 +83,8 @@
private final AtomicBoolean running = new AtomicBoolean(false);
private final List<RuntimeSpawner> spawners = new LinkedList<>();
private final String narExtractionDirectory;
+ private final File narExtractionDirectoryCreated;
+ private final String connectorsDir;
private final Thread shutdownHook;
private ClassLoader userCodeClassLoader;
private boolean userCodeClassLoaderCreated;
@@ -180,7 +185,8 @@
stateStorageServiceUrl, String brokerServiceUrl, String clientAuthPlugin, String clientAuthParams,
boolean useTls, boolean tlsAllowInsecureConnection, boolean tlsHostNameVerificationEnabled,
String tlsTrustCertFilePath, int instanceIdOffset, RuntimeEnv runtimeEnv,
- String secretsProviderClassName, String secretsProviderConfig, String narExtractionDirectory) {
+ String secretsProviderClassName, String secretsProviderConfig, String narExtractionDirectory,
+ String connectorsDirectory) {
this.functionConfig = functionConfig;
this.sourceConfig = sourceConfig;
this.sinkConfig = sinkConfig;
@@ -196,8 +202,22 @@
this.runtimeEnv = runtimeEnv;
this.secretsProviderClassName = secretsProviderClassName;
this.secretsProviderConfig = secretsProviderConfig;
- this.narExtractionDirectory = narExtractionDirectory != null ? narExtractionDirectory
- : NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR;
+ if (narExtractionDirectory != null) {
+ this.narExtractionDirectoryCreated = null;
+ this.narExtractionDirectory = narExtractionDirectory;
+ } else {
+ this.narExtractionDirectoryCreated = createNarExtractionTempDirectory();
+ this.narExtractionDirectory = this.narExtractionDirectoryCreated.getAbsolutePath();
+ }
+ if (connectorsDirectory != null) {
+ this.connectorsDir = connectorsDirectory;
+ } else {
+ String pulsarHome = System.getenv("PULSAR_HOME");
+ if (pulsarHome == null) {
+ pulsarHome = Paths.get("").toAbsolutePath().toString();
+ }
+ this.connectorsDir = Paths.get(pulsarHome, "connectors").toString();
+ }
shutdownHook = new Thread() {
public void run() {
LocalRunner.this.stop();
@@ -205,9 +225,23 @@
};
}
+ private static File createNarExtractionTempDirectory() {
+ try {
+ return Files.createTempDirectory("pulsar_localrunner_nars_").toFile();
+ } catch (IOException e) {
+ throw new UncheckedIOException("Cannot create temp directory", e);
+ }
+ }
+
@Override
public void close() throws Exception {
- stop();
+ try {
+ stop();
+ } finally {
+ if (narExtractionDirectoryCreated != null) {
+ FileUtils.deleteFile(narExtractionDirectoryCreated, true);
+ }
+ }
}
public synchronized void stop() {
@@ -261,7 +295,7 @@
String functionType = functionConfig.getJar().replaceFirst("^builtin://", "");
userCodeFile = functions.getFunctions().get(functionType).toString();
}
-
+
if (Utils.isFunctionPackageUrlSupported(userCodeFile)) {
File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
userCodeClassLoader = FunctionConfigUtils.validate(functionConfig, file);
@@ -584,12 +618,6 @@
}
private TreeMap<String, Connector> getConnectors() throws IOException {
- // Validate the connector source type from the locally available connectors
- String pulsarHome = System.getenv("PULSAR_HOME");
- if (pulsarHome == null) {
- pulsarHome = Paths.get("").toAbsolutePath().toString();
- }
- String connectorsDir = Paths.get(pulsarHome, "connectors").toString();
return ConnectorUtils.searchForConnectors(connectorsDir, narExtractionDirectory);
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index db62afd..5d2f8ee 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -18,7 +18,29 @@
*/
package org.apache.pulsar.functions.worker.rest.api;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
+import static org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
+import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
+
import com.google.protobuf.ByteString;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
@@ -45,28 +67,6 @@
import org.apache.pulsar.packages.management.core.common.PackageType;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.UriBuilder;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Optional;
-import java.util.function.Supplier;
-
-import static org.apache.commons.lang3.StringUtils.isBlank;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
-import static org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
-import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
-
@Slf4j
public class FunctionsImpl extends ComponentImpl implements Functions<PulsarWorkerService> {
@@ -763,8 +763,19 @@
}
private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
- File file = Files.createTempFile("function", ".tmp").toFile();
- worker().getBrokerAdmin().packages().download(packageName, file.toString());
+ return downloadPackageFile(worker(), packageName);
+ }
+
+ static File downloadPackageFile(PulsarWorkerService worker, String packageName) throws IOException, PulsarAdminException {
+ Path tempDirectory;
+ if (worker.getWorkerConfig().getDownloadDirectory() != null) {
+ tempDirectory = Paths.get(worker.getWorkerConfig().getDownloadDirectory());
+ } else {
+ // use the Nar extraction directory as a temporary directory for downloaded files
+ tempDirectory = Paths.get(worker.getWorkerConfig().getNarExtractionDirectory());
+ }
+ File file = Files.createTempFile(tempDirectory, "function", ".tmp").toFile();
+ worker.getBrokerAdmin().packages().download(packageName, file.toString());
return file;
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index c059e18..92d52f0 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -18,7 +18,26 @@
*/
package org.apache.pulsar.functions.worker.rest.api;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
+import static org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
+import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
+
import com.google.protobuf.ByteString;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
@@ -46,22 +65,6 @@
import org.apache.pulsar.packages.management.core.common.PackageType;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.nio.file.Files;
-import java.util.*;
-import java.util.function.Supplier;
-
-import static org.apache.commons.lang3.StringUtils.isBlank;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
-import static org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
-import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
-
@Slf4j
public class SinksImpl extends ComponentImpl implements Sinks<PulsarWorkerService> {
@@ -739,14 +742,11 @@
return SinkConfigUtils.convert(sinkConfig, sinkDetails);
}
-
private static boolean hasPackageTypePrefix(String destPkgUrl) {
return Arrays.stream(PackageType.values()).anyMatch(type -> destPkgUrl.startsWith(type.toString()));
}
private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
- File file = Files.createTempFile("function", ".tmp").toFile();
- worker().getBrokerAdmin().packages().download(packageName, file.toString());
- return file;
+ return FunctionsImpl.downloadPackageFile(worker(), packageName);
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index 089b67c..147f2f7 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -18,7 +18,26 @@
*/
package org.apache.pulsar.functions.worker.rest.api;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
+import static org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
+import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
+
import com.google.protobuf.ByteString;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
@@ -46,22 +65,6 @@
import org.apache.pulsar.packages.management.core.common.PackageType;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
-import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.Response;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.nio.file.Files;
-import java.util.*;
-import java.util.function.Supplier;
-
-import static org.apache.commons.lang3.StringUtils.isBlank;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
-import static org.apache.pulsar.functions.utils.FunctionCommon.isFunctionCodeBuiltin;
-import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
-
@Slf4j
public class SourcesImpl extends ComponentImpl implements Sources<PulsarWorkerService> {
@@ -741,8 +744,6 @@
}
private File downloadPackageFile(String packageName) throws IOException, PulsarAdminException {
- File file = Files.createTempFile("function", ".tmp").toFile();
- worker().getBrokerAdmin().packages().download(packageName, file.toString());
- return file;
+ return FunctionsImpl.downloadPackageFile(worker(), packageName);
}
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index 41755f7..03c1a14 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -50,6 +50,7 @@
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.IObjectFactory;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
@@ -122,6 +123,7 @@
private InputStream mockedInputStream;
private FormDataContentDisposition mockedFormData;
private Function.FunctionMetaData mockedFunctionMetadata;
+ private PulsarFunctionTestTemporaryDirectory tempDirectory;
@BeforeMethod
public void setup() throws Exception {
@@ -185,16 +187,25 @@
WorkerConfig workerConfig = new WorkerConfig()
.setWorkerId(workerId)
.setWorkerPort(8080)
- .setDownloadDirectory("/tmp/pulsar/functions")
.setFunctionMetadataTopicName("pulsar/functions")
.setNumFunctionPackageReplicas(3)
.setPulsarServiceUrl("pulsar://localhost:6650/");
+ tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
+ tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
mockStatic(InstanceUtils.class);
- PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(Function.FunctionDetails.ComponentType.FUNCTION); }
+ PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(Function.FunctionDetails.ComponentType.FUNCTION);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void cleanup() {
+ if (tempDirectory != null) {
+ tempDirectory.delete();
+ }
+ }
@Test
public void testStatusEmpty() {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/PulsarFunctionTestTemporaryDirectory.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/PulsarFunctionTestTemporaryDirectory.java
new file mode 100644
index 0000000..5f403dd
--- /dev/null
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/PulsarFunctionTestTemporaryDirectory.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+import org.apache.commons.io.FileUtils;
+import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.testng.Assert;
+
+/**
+ * Creates a temporary directory that contains 3 subdirectories,
+ * "narExtractionDirectory", "downloadDirectory" and "connectorsDirectory",
+ * which are assigned to the provided workerConfig's respective settings with
+ * the {@link #useTemporaryDirectoriesForWorkerConfig(WorkerConfig)} method
+ */
+public class PulsarFunctionTestTemporaryDirectory {
+ private final File tempDirectory;
+ private final File narExtractionDirectory;
+ private final File downloadDirectory;
+ private final File connectorsDirectory;
+
+ private PulsarFunctionTestTemporaryDirectory(String tempDirectoryNamePrefix) throws IOException {
+ tempDirectory = Files.createTempDirectory(tempDirectoryNamePrefix).toFile();
+ narExtractionDirectory = new File(tempDirectory, "narExtractionDirectory");
+ narExtractionDirectory.mkdir();
+ downloadDirectory = new File(tempDirectory, "downloadDirectory");
+ downloadDirectory.mkdir();
+ connectorsDirectory = new File(tempDirectory, "connectorsDirectory");
+ connectorsDirectory.mkdir();
+ }
+
+ public static PulsarFunctionTestTemporaryDirectory create(String tempDirectoryNamePrefix) {
+ try {
+ return new PulsarFunctionTestTemporaryDirectory(tempDirectoryNamePrefix);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Cannot create temporary directory", e);
+ }
+ }
+
+ public void useTemporaryDirectoriesForWorkerConfig(WorkerConfig workerConfig) {
+ workerConfig.setNarExtractionDirectory(narExtractionDirectory.getAbsolutePath());
+ workerConfig.setDownloadDirectory(downloadDirectory.getAbsolutePath());
+ workerConfig.setConnectorsDirectory(connectorsDirectory.getAbsolutePath());
+ }
+
+ public void delete() {
+ try {
+ FileUtils.deleteDirectory(tempDirectory);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Cannot delete temporary directory", e);
+ }
+ }
+
+ public void assertThatFunctionDownloadTempFilesHaveBeenDeleted() {
+ // make sure all temp files are deleted
+ File[] foundFiles = downloadDirectory.listFiles((dir1, name) -> name.startsWith("function"));
+ Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: "
+ + Arrays.asList(foundFiles));
+ }
+}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index e040334..29fb11c 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -85,12 +85,14 @@
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImplV2;
+import org.apache.pulsar.functions.worker.rest.api.PulsarFunctionTestTemporaryDirectory;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.Assert;
import org.testng.IObjectFactory;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
@@ -144,6 +146,7 @@
private FormDataContentDisposition mockedFormData;
private FunctionMetaData mockedFunctionMetadata;
private LeaderService mockedLeaderService;
+ private PulsarFunctionTestTemporaryDirectory tempDirectory;
@BeforeMethod
public void setup() throws Exception {
@@ -184,10 +187,11 @@
WorkerConfig workerConfig = new WorkerConfig()
.setWorkerId("test")
.setWorkerPort(8080)
- .setDownloadDirectory("/tmp/pulsar/functions")
.setFunctionMetadataTopicName("pulsar/functions")
.setNumFunctionPackageReplicas(3)
.setPulsarServiceUrl("pulsar://localhost:6650/");
+ tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
+ tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
FunctionsImpl functions = spy(new FunctionsImpl(() -> mockedWorkerService));
@@ -199,6 +203,13 @@
}
+ @AfterMethod(alwaysRun = true)
+ public void cleanup() {
+ if (tempDirectory != null) {
+ tempDirectory.delete();
+ }
+ }
+
//
// Register Functions
//
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 3d356e8..27694db 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -83,6 +83,7 @@
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
+import org.apache.pulsar.functions.worker.rest.api.PulsarFunctionTestTemporaryDirectory;
import org.apache.pulsar.functions.worker.rest.api.v2.FunctionsApiV2Resource;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.mockito.Mockito;
@@ -91,6 +92,7 @@
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.Assert;
import org.testng.IObjectFactory;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
@@ -152,6 +154,7 @@
private FunctionMetaData mockedFunctionMetadata;
private LeaderService mockedLeaderService;
private Packages mockedPackages;
+ private PulsarFunctionTestTemporaryDirectory tempDirectory;
@BeforeMethod
public void setup() throws Exception {
@@ -195,10 +198,11 @@
WorkerConfig workerConfig = new WorkerConfig()
.setWorkerId("test")
.setWorkerPort(8080)
- .setDownloadDirectory("/tmp/pulsar/functions")
.setFunctionMetadataTopicName("pulsar/functions")
.setNumFunctionPackageReplicas(3)
.setPulsarServiceUrl("pulsar://localhost:6650/");
+ tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
+ tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
@@ -206,6 +210,13 @@
PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(FunctionDetails.ComponentType.FUNCTION);
}
+ @AfterMethod(alwaysRun = true)
+ public void cleanup() {
+ if (tempDirectory != null) {
+ tempDirectory.delete();
+ }
+ }
+
//
// Register Functions
//
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 473b25e..97b2d96 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -78,6 +78,7 @@
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerUtils;
+import org.apache.pulsar.functions.worker.rest.api.PulsarFunctionTestTemporaryDirectory;
import org.apache.pulsar.functions.worker.rest.api.SinksImpl;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.mockito.Mockito;
@@ -85,6 +86,7 @@
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.IObjectFactory;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
@@ -134,6 +136,7 @@
private FunctionMetaData mockedFunctionMetaData;
private LeaderService mockedLeaderService;
private Packages mockedPackages;
+ private PulsarFunctionTestTemporaryDirectory tempDirectory;
private static final String SYSTEM_PROPERTY_NAME_CASSANDRA_NAR_FILE_PATH = "pulsar-io-cassandra.nar.path";
@@ -203,10 +206,11 @@
WorkerConfig workerConfig = new WorkerConfig()
.setWorkerId("test")
.setWorkerPort(8080)
- .setDownloadDirectory("/tmp/pulsar/functions")
.setFunctionMetadataTopicName("pulsar/functions")
.setNumFunctionPackageReplicas(3)
.setPulsarServiceUrl("pulsar://localhost:6650/");
+ tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
+ tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
this.resource = spy(new SinksImpl(() -> mockedWorkerService));
@@ -214,6 +218,13 @@
PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(FunctionDetails.ComponentType.SINK);
}
+ @AfterMethod(alwaysRun = true)
+ public void cleanup() {
+ if (tempDirectory != null) {
+ tempDirectory.delete();
+ }
+ }
+
//
// Register Functions
//
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index cd85ec6..5bd07da 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -81,6 +81,7 @@
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerUtils;
+import org.apache.pulsar.functions.worker.rest.api.PulsarFunctionTestTemporaryDirectory;
import org.apache.pulsar.functions.worker.rest.api.SourcesImpl;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.mockito.Mockito;
@@ -89,6 +90,7 @@
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.IObjectFactory;
import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
@@ -132,6 +134,7 @@
private FunctionMetaData mockedFunctionMetaData;
private LeaderService mockedLeaderService;
private Packages mockedPackages;
+ private PulsarFunctionTestTemporaryDirectory tempDirectory;
private static NarClassLoader narClassLoader;
@@ -192,10 +195,11 @@
WorkerConfig workerConfig = new WorkerConfig()
.setWorkerId("test")
.setWorkerPort(8080)
- .setDownloadDirectory("/tmp/pulsar/functions")
.setFunctionMetadataTopicName("pulsar/functions")
.setNumFunctionPackageReplicas(3)
.setPulsarServiceUrl("pulsar://localhost:6650/");
+ tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
+ tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
this.resource = spy(new SourcesImpl(() -> mockedWorkerService));
@@ -203,6 +207,13 @@
PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(FunctionDetails.ComponentType.SOURCE);
}
+ @AfterMethod(alwaysRun = true)
+ public void cleanup() {
+ if (tempDirectory != null) {
+ tempDirectory.delete();
+ }
+ }
+
//
// Register Functions
//