| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.functions.worker.rest.api.v3; |
| |
| import static org.apache.pulsar.functions.source.TopicSchema.DEFAULT_SERDE; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.Mockito.doAnswer; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.when; |
| import static org.mockito.Mockito.withSettings; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.UncheckedIOException; |
| import java.lang.reflect.Method; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.nio.file.Paths; |
| import java.nio.file.StandardCopyOption; |
| import java.util.HashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.function.Consumer; |
| import org.apache.distributedlog.api.namespace.Namespace; |
| import org.apache.pulsar.client.admin.Functions; |
| import org.apache.pulsar.client.admin.Namespaces; |
| import org.apache.pulsar.client.admin.Packages; |
| import org.apache.pulsar.client.admin.PulsarAdmin; |
| import org.apache.pulsar.client.admin.Tenants; |
| import org.apache.pulsar.common.functions.FunctionDefinition; |
| import org.apache.pulsar.common.io.ConnectorDefinition; |
| import org.apache.pulsar.common.nar.NarClassLoader; |
| import org.apache.pulsar.common.policies.data.TenantInfoImpl; |
| import org.apache.pulsar.functions.instance.InstanceUtils; |
| import org.apache.pulsar.functions.proto.Function; |
| import org.apache.pulsar.functions.runtime.RuntimeFactory; |
| import org.apache.pulsar.functions.source.TopicSchema; |
| import org.apache.pulsar.functions.utils.LoadedFunctionPackage; |
| import org.apache.pulsar.functions.utils.ValidatableFunctionPackage; |
| import org.apache.pulsar.functions.utils.functions.FunctionArchive; |
| import org.apache.pulsar.functions.utils.functions.FunctionUtils; |
| import org.apache.pulsar.functions.utils.io.Connector; |
| import org.apache.pulsar.functions.utils.io.ConnectorUtils; |
| import org.apache.pulsar.functions.worker.ConnectorsManager; |
| import org.apache.pulsar.functions.worker.FunctionMetaDataManager; |
| import org.apache.pulsar.functions.worker.FunctionRuntimeManager; |
| import org.apache.pulsar.functions.worker.FunctionsManager; |
| import org.apache.pulsar.functions.worker.LeaderService; |
| import org.apache.pulsar.functions.worker.PackageUrlValidator; |
| import org.apache.pulsar.functions.worker.PulsarWorkerService; |
| import org.apache.pulsar.functions.worker.WorkerConfig; |
| import org.apache.pulsar.functions.worker.WorkerUtils; |
| import org.apache.pulsar.functions.worker.rest.api.PulsarFunctionTestTemporaryDirectory; |
| import org.glassfish.jersey.media.multipart.FormDataContentDisposition; |
| import org.mockito.Answers; |
| import org.mockito.MockSettings; |
| import org.mockito.MockedStatic; |
| import org.mockito.Mockito; |
| import org.testng.annotations.AfterMethod; |
| import org.testng.annotations.BeforeMethod; |
| |
| public abstract class AbstractFunctionsResourceTest { |
| |
| protected static final String tenant = "test-tenant"; |
| protected static final String namespace = "test-namespace"; |
| protected static final Map<String, String> topicsToSerDeClassName = new HashMap<>(); |
| protected static final String subscriptionName = "test-subscription"; |
| protected static final String CASSANDRA_STRING_SINK = "org.apache.pulsar.io.cassandra.CassandraStringSink"; |
| protected static final int parallelism = 1; |
| private static final String SYSTEM_PROPERTY_NAME_CASSANDRA_NAR_FILE_PATH = "pulsar-io-cassandra.nar.path"; |
| private static final String SYSTEM_PROPERTY_NAME_TWITTER_NAR_FILE_PATH = "pulsar-io-twitter.nar.path"; |
| private static final String SYSTEM_PROPERTY_NAME_INVALID_NAR_FILE_PATH = "pulsar-io-invalid.nar.path"; |
| private static final String SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_NAR_FILE_PATH = |
| "pulsar-functions-api-examples.nar.path"; |
| protected static Map<String, MockedStatic> mockStaticContexts = new HashMap<>(); |
| |
| static { |
| topicsToSerDeClassName.put("test_src", DEFAULT_SERDE); |
| topicsToSerDeClassName.put("persistent://public/default/test_src", TopicSchema.DEFAULT_SERDE); |
| } |
| |
| protected PulsarWorkerService mockedWorkerService; |
| protected PulsarAdmin mockedPulsarAdmin; |
| protected Tenants mockedTenants; |
| protected Namespaces mockedNamespaces; |
| protected Functions mockedFunctions; |
| protected TenantInfoImpl mockedTenantInfo; |
| protected List<String> namespaceList = new LinkedList<>(); |
| protected FunctionMetaDataManager mockedManager; |
| protected FunctionRuntimeManager mockedFunctionRunTimeManager; |
| protected RuntimeFactory mockedRuntimeFactory; |
| protected Namespace mockedNamespace; |
| protected InputStream mockedInputStream; |
| protected FormDataContentDisposition mockedFormData; |
| protected Function.FunctionMetaData mockedFunctionMetaData; |
| protected LeaderService mockedLeaderService; |
| protected Packages mockedPackages; |
| protected PulsarFunctionTestTemporaryDirectory tempDirectory; |
| protected ConnectorsManager connectorsManager = new ConnectorsManager(); |
| protected FunctionsManager functionsManager = new FunctionsManager(); |
| |
| public static File getPulsarIOCassandraNar() { |
| return new File(Objects.requireNonNull(System.getProperty(SYSTEM_PROPERTY_NAME_CASSANDRA_NAR_FILE_PATH) |
| , "pulsar-io-cassandra.nar file location must be specified with " |
| + SYSTEM_PROPERTY_NAME_CASSANDRA_NAR_FILE_PATH + " system property")); |
| } |
| |
| public static File getPulsarIOTwitterNar() { |
| return new File(Objects.requireNonNull(System.getProperty(SYSTEM_PROPERTY_NAME_TWITTER_NAR_FILE_PATH) |
| , "pulsar-io-twitter.nar file location must be specified with " |
| + SYSTEM_PROPERTY_NAME_TWITTER_NAR_FILE_PATH + " system property")); |
| } |
| |
| public static File getPulsarIOInvalidNar() { |
| return new File(Objects.requireNonNull(System.getProperty(SYSTEM_PROPERTY_NAME_INVALID_NAR_FILE_PATH) |
| , "invalid nar file location must be specified with " |
| + SYSTEM_PROPERTY_NAME_INVALID_NAR_FILE_PATH + " system property")); |
| } |
| |
| public static File getPulsarApiExamplesNar() { |
| return new File(Objects.requireNonNull( |
| System.getProperty(SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_NAR_FILE_PATH) |
| , "pulsar-functions-api-examples.nar file location must be specified with " |
| + SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_NAR_FILE_PATH + " system property")); |
| } |
| |
| @BeforeMethod |
| public final void setup(Method method) throws Exception { |
| this.mockedManager = mock(FunctionMetaDataManager.class); |
| this.mockedFunctionRunTimeManager = mock(FunctionRuntimeManager.class); |
| this.mockedRuntimeFactory = mock(RuntimeFactory.class); |
| this.mockedInputStream = mock(InputStream.class); |
| this.mockedNamespace = mock(Namespace.class); |
| this.mockedFormData = mock(FormDataContentDisposition.class); |
| when(mockedFormData.getFileName()).thenReturn("test"); |
| this.mockedTenantInfo = mock(TenantInfoImpl.class); |
| this.mockedPulsarAdmin = mock(PulsarAdmin.class); |
| this.mockedTenants = mock(Tenants.class); |
| this.mockedNamespaces = mock(Namespaces.class); |
| this.mockedFunctions = mock(Functions.class); |
| this.mockedLeaderService = mock(LeaderService.class); |
| this.mockedPackages = mock(Packages.class); |
| namespaceList.add(tenant + "/" + namespace); |
| |
| this.mockedWorkerService = mock(PulsarWorkerService.class); |
| when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager); |
| when(mockedWorkerService.getLeaderService()).thenReturn(mockedLeaderService); |
| when(mockedWorkerService.getFunctionRuntimeManager()).thenReturn(mockedFunctionRunTimeManager); |
| when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory); |
| when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace); |
| when(mockedWorkerService.isInitialized()).thenReturn(true); |
| when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin); |
| when(mockedWorkerService.getFunctionAdmin()).thenReturn(mockedPulsarAdmin); |
| when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants); |
| when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces); |
| when(mockedPulsarAdmin.functions()).thenReturn(mockedFunctions); |
| when(mockedPulsarAdmin.packages()).thenReturn(mockedPackages); |
| when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo); |
| when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList); |
| when(mockedLeaderService.isLeader()).thenReturn(true); |
| doAnswer(invocationOnMock -> { |
| Files.copy(getDefaultNarFile().toPath(), Paths.get(invocationOnMock.getArgument(1, String.class)), |
| StandardCopyOption.REPLACE_EXISTING); |
| return null; |
| }).when(mockedPackages).download(any(), any()); |
| |
| // worker config |
| List<String> urlPatterns = |
| List.of("http://localhost.*", "file:.*", "https://repo1.maven.org/maven2/org/apache/pulsar/.*"); |
| WorkerConfig workerConfig = new WorkerConfig() |
| .setWorkerId("test") |
| .setWorkerPort(8080) |
| .setFunctionMetadataTopicName("pulsar/functions") |
| .setNumFunctionPackageReplicas(3) |
| .setPulsarServiceUrl("pulsar://localhost:6650/") |
| .setAdditionalEnabledFunctionsUrlPatterns(urlPatterns) |
| .setAdditionalEnabledConnectorUrlPatterns(urlPatterns) |
| .setFunctionsWorkerEnablePackageManagement(true); |
| customizeWorkerConfig(workerConfig, method); |
| tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName()); |
| tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig); |
| when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig); |
| when(mockedWorkerService.getFunctionsManager()).thenReturn(functionsManager); |
| when(mockedWorkerService.getConnectorsManager()).thenReturn(connectorsManager); |
| PackageUrlValidator packageUrlValidator = new PackageUrlValidator(workerConfig); |
| when(mockedWorkerService.getPackageUrlValidator()).thenReturn(packageUrlValidator); |
| |
| doSetup(); |
| } |
| |
| protected void customizeWorkerConfig(WorkerConfig workerConfig, Method method) { |
| |
| } |
| |
| protected File getDefaultNarFile() { |
| return getPulsarIOTwitterNar(); |
| } |
| |
| protected void doSetup() throws Exception { |
| |
| } |
| |
| @AfterMethod(alwaysRun = true) |
| public void cleanup() { |
| if (tempDirectory != null) { |
| tempDirectory.delete(); |
| } |
| mockStaticContexts.values().forEach(MockedStatic::close); |
| mockStaticContexts.clear(); |
| } |
| |
| protected <T> void mockStatic(Class<T> classStatic, Consumer<MockedStatic<T>> consumer) { |
| mockStatic(classStatic, withSettings().defaultAnswer(Mockito.CALLS_REAL_METHODS), consumer); |
| } |
| |
| private <T> void mockStatic(Class<T> classStatic, MockSettings mockSettings, Consumer<MockedStatic<T>> consumer) { |
| final MockedStatic<T> mockedStatic = mockStaticContexts.computeIfAbsent(classStatic.getName(), |
| name -> Mockito.mockStatic(classStatic, mockSettings)); |
| consumer.accept(mockedStatic); |
| } |
| |
| protected void mockWorkerUtils() { |
| mockWorkerUtils(null); |
| } |
| |
| protected void mockWorkerUtils(Consumer<MockedStatic<WorkerUtils>> consumer) { |
| mockStatic(WorkerUtils.class, withSettings(), ctx -> { |
| // make dumpToTmpFile return the nar file copy |
| ctx.when(() -> WorkerUtils.dumpToTmpFile(mockedInputStream)) |
| .thenAnswer(invocation -> { |
| Path tempFile = Files.createTempFile("test", ".nar"); |
| Files.copy(getPulsarApiExamplesNar().toPath(), tempFile, |
| StandardCopyOption.REPLACE_EXISTING); |
| return tempFile.toFile(); |
| }); |
| ctx.when(() -> WorkerUtils.dumpToTmpFile(any())) |
| .thenAnswer(Answers.CALLS_REAL_METHODS); |
| if (consumer != null) { |
| consumer.accept(ctx); |
| } |
| }); |
| } |
| |
| protected void mockInstanceUtils() { |
| mockStatic(InstanceUtils.class, ctx -> { |
| ctx.when(() -> InstanceUtils.calculateSubjectType(any())) |
| .thenReturn(getComponentType()); |
| }); |
| } |
| |
| protected abstract Function.FunctionDetails.ComponentType getComponentType(); |
| |
| public static class LoadedConnector extends Connector { |
| public LoadedConnector(ConnectorDefinition connectorDefinition) { |
| super(null, connectorDefinition, null, true); |
| } |
| |
| @Override |
| public ValidatableFunctionPackage getConnectorFunctionPackage() { |
| return new LoadedFunctionPackage(getClass().getClassLoader(), ConnectorDefinition.class, |
| getConnectorDefinition()); |
| } |
| } |
| |
| |
| protected void registerBuiltinConnector(String connectorType, String className) { |
| ConnectorDefinition connectorDefinition = null; |
| if (className != null) { |
| connectorDefinition = new ConnectorDefinition(); |
| // set source and sink class to the same to simplify the test |
| connectorDefinition.setSinkClass(className); |
| connectorDefinition.setSourceClass(className); |
| } |
| connectorsManager.addConnector(connectorType, new LoadedConnector(connectorDefinition)); |
| } |
| |
| protected void registerBuiltinConnector(String connectorType, File packageFile) { |
| ConnectorDefinition cntDef; |
| try { |
| cntDef = ConnectorUtils.getConnectorDefinition(packageFile); |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| connectorsManager.addConnector(connectorType, |
| new Connector(packageFile.toPath(), cntDef, NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, true)); |
| } |
| |
| public static class LoadedFunctionArchive extends FunctionArchive { |
| public LoadedFunctionArchive(FunctionDefinition functionDefinition) { |
| super(null, functionDefinition, null, true); |
| } |
| |
| @Override |
| public ValidatableFunctionPackage getFunctionPackage() { |
| return new LoadedFunctionPackage(getClass().getClassLoader(), FunctionDefinition.class, |
| getFunctionDefinition()); |
| } |
| } |
| |
| protected void registerBuiltinFunction(String functionType, String className) { |
| FunctionDefinition functionDefinition = null; |
| if (className != null) { |
| functionDefinition = new FunctionDefinition(); |
| functionDefinition.setFunctionClass(className); |
| } |
| functionsManager.addFunction(functionType, new LoadedFunctionArchive(functionDefinition)); |
| } |
| |
| protected void registerBuiltinFunction(String functionType, File packageFile) { |
| FunctionDefinition cntDef; |
| try { |
| cntDef = FunctionUtils.getFunctionDefinition(packageFile); |
| } catch (IOException e) { |
| throw new UncheckedIOException(e); |
| } |
| functionsManager.addFunction(functionType, |
| new FunctionArchive(packageFile.toPath(), cntDef, NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, true)); |
| } |
| } |