blob: 51ca4c83f9e02b1473829efa3fa1b8c506d49250 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.worker.rest.api.v3;
import static org.apache.pulsar.functions.source.TopicSchema.DEFAULT_SERDE;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.lang.reflect.Method;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.Functions;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.Packages;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.common.functions.FunctionDefinition;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.LoadedFunctionPackage;
import org.apache.pulsar.functions.utils.ValidatableFunctionPackage;
import org.apache.pulsar.functions.utils.functions.FunctionArchive;
import org.apache.pulsar.functions.utils.functions.FunctionUtils;
import org.apache.pulsar.functions.utils.io.Connector;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.FunctionsManager;
import org.apache.pulsar.functions.worker.LeaderService;
import org.apache.pulsar.functions.worker.PackageUrlValidator;
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.rest.api.PulsarFunctionTestTemporaryDirectory;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.mockito.Answers;
import org.mockito.MockSettings;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
public abstract class AbstractFunctionsResourceTest {
protected static final String tenant = "test-tenant";
protected static final String namespace = "test-namespace";
protected static final Map<String, String> topicsToSerDeClassName = new HashMap<>();
protected static final String subscriptionName = "test-subscription";
protected static final String CASSANDRA_STRING_SINK = "org.apache.pulsar.io.cassandra.CassandraStringSink";
protected static final int parallelism = 1;
private static final String SYSTEM_PROPERTY_NAME_CASSANDRA_NAR_FILE_PATH = "pulsar-io-cassandra.nar.path";
private static final String SYSTEM_PROPERTY_NAME_TWITTER_NAR_FILE_PATH = "pulsar-io-twitter.nar.path";
private static final String SYSTEM_PROPERTY_NAME_INVALID_NAR_FILE_PATH = "pulsar-io-invalid.nar.path";
private static final String SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_NAR_FILE_PATH =
"pulsar-functions-api-examples.nar.path";
protected static Map<String, MockedStatic> mockStaticContexts = new HashMap<>();
static {
topicsToSerDeClassName.put("test_src", DEFAULT_SERDE);
topicsToSerDeClassName.put("persistent://public/default/test_src", TopicSchema.DEFAULT_SERDE);
}
protected PulsarWorkerService mockedWorkerService;
protected PulsarAdmin mockedPulsarAdmin;
protected Tenants mockedTenants;
protected Namespaces mockedNamespaces;
protected Functions mockedFunctions;
protected TenantInfoImpl mockedTenantInfo;
protected List<String> namespaceList = new LinkedList<>();
protected FunctionMetaDataManager mockedManager;
protected FunctionRuntimeManager mockedFunctionRunTimeManager;
protected RuntimeFactory mockedRuntimeFactory;
protected Namespace mockedNamespace;
protected InputStream mockedInputStream;
protected FormDataContentDisposition mockedFormData;
protected Function.FunctionMetaData mockedFunctionMetaData;
protected LeaderService mockedLeaderService;
protected Packages mockedPackages;
protected PulsarFunctionTestTemporaryDirectory tempDirectory;
protected ConnectorsManager connectorsManager = new ConnectorsManager();
protected FunctionsManager functionsManager = new FunctionsManager();
public static File getPulsarIOCassandraNar() {
return new File(Objects.requireNonNull(System.getProperty(SYSTEM_PROPERTY_NAME_CASSANDRA_NAR_FILE_PATH)
, "pulsar-io-cassandra.nar file location must be specified with "
+ SYSTEM_PROPERTY_NAME_CASSANDRA_NAR_FILE_PATH + " system property"));
}
public static File getPulsarIOTwitterNar() {
return new File(Objects.requireNonNull(System.getProperty(SYSTEM_PROPERTY_NAME_TWITTER_NAR_FILE_PATH)
, "pulsar-io-twitter.nar file location must be specified with "
+ SYSTEM_PROPERTY_NAME_TWITTER_NAR_FILE_PATH + " system property"));
}
public static File getPulsarIOInvalidNar() {
return new File(Objects.requireNonNull(System.getProperty(SYSTEM_PROPERTY_NAME_INVALID_NAR_FILE_PATH)
, "invalid nar file location must be specified with "
+ SYSTEM_PROPERTY_NAME_INVALID_NAR_FILE_PATH + " system property"));
}
public static File getPulsarApiExamplesNar() {
return new File(Objects.requireNonNull(
System.getProperty(SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_NAR_FILE_PATH)
, "pulsar-functions-api-examples.nar file location must be specified with "
+ SYSTEM_PROPERTY_NAME_FUNCTIONS_API_EXAMPLES_NAR_FILE_PATH + " system property"));
}
@BeforeMethod
public final void setup(Method method) throws Exception {
this.mockedManager = mock(FunctionMetaDataManager.class);
this.mockedFunctionRunTimeManager = mock(FunctionRuntimeManager.class);
this.mockedRuntimeFactory = mock(RuntimeFactory.class);
this.mockedInputStream = mock(InputStream.class);
this.mockedNamespace = mock(Namespace.class);
this.mockedFormData = mock(FormDataContentDisposition.class);
when(mockedFormData.getFileName()).thenReturn("test");
this.mockedTenantInfo = mock(TenantInfoImpl.class);
this.mockedPulsarAdmin = mock(PulsarAdmin.class);
this.mockedTenants = mock(Tenants.class);
this.mockedNamespaces = mock(Namespaces.class);
this.mockedFunctions = mock(Functions.class);
this.mockedLeaderService = mock(LeaderService.class);
this.mockedPackages = mock(Packages.class);
namespaceList.add(tenant + "/" + namespace);
this.mockedWorkerService = mock(PulsarWorkerService.class);
when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);
when(mockedWorkerService.getLeaderService()).thenReturn(mockedLeaderService);
when(mockedWorkerService.getFunctionRuntimeManager()).thenReturn(mockedFunctionRunTimeManager);
when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory);
when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
when(mockedWorkerService.isInitialized()).thenReturn(true);
when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin);
when(mockedWorkerService.getFunctionAdmin()).thenReturn(mockedPulsarAdmin);
when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants);
when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces);
when(mockedPulsarAdmin.functions()).thenReturn(mockedFunctions);
when(mockedPulsarAdmin.packages()).thenReturn(mockedPackages);
when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo);
when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList);
when(mockedLeaderService.isLeader()).thenReturn(true);
doAnswer(invocationOnMock -> {
Files.copy(getDefaultNarFile().toPath(), Paths.get(invocationOnMock.getArgument(1, String.class)),
StandardCopyOption.REPLACE_EXISTING);
return null;
}).when(mockedPackages).download(any(), any());
// worker config
List<String> urlPatterns =
List.of("http://localhost.*", "file:.*", "https://repo1.maven.org/maven2/org/apache/pulsar/.*");
WorkerConfig workerConfig = new WorkerConfig()
.setWorkerId("test")
.setWorkerPort(8080)
.setFunctionMetadataTopicName("pulsar/functions")
.setNumFunctionPackageReplicas(3)
.setPulsarServiceUrl("pulsar://localhost:6650/")
.setAdditionalEnabledFunctionsUrlPatterns(urlPatterns)
.setAdditionalEnabledConnectorUrlPatterns(urlPatterns)
.setFunctionsWorkerEnablePackageManagement(true);
customizeWorkerConfig(workerConfig, method);
tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
when(mockedWorkerService.getFunctionsManager()).thenReturn(functionsManager);
when(mockedWorkerService.getConnectorsManager()).thenReturn(connectorsManager);
PackageUrlValidator packageUrlValidator = new PackageUrlValidator(workerConfig);
when(mockedWorkerService.getPackageUrlValidator()).thenReturn(packageUrlValidator);
doSetup();
}
protected void customizeWorkerConfig(WorkerConfig workerConfig, Method method) {
}
protected File getDefaultNarFile() {
return getPulsarIOTwitterNar();
}
protected void doSetup() throws Exception {
}
@AfterMethod(alwaysRun = true)
public void cleanup() {
if (tempDirectory != null) {
tempDirectory.delete();
}
mockStaticContexts.values().forEach(MockedStatic::close);
mockStaticContexts.clear();
}
protected <T> void mockStatic(Class<T> classStatic, Consumer<MockedStatic<T>> consumer) {
mockStatic(classStatic, withSettings().defaultAnswer(Mockito.CALLS_REAL_METHODS), consumer);
}
private <T> void mockStatic(Class<T> classStatic, MockSettings mockSettings, Consumer<MockedStatic<T>> consumer) {
final MockedStatic<T> mockedStatic = mockStaticContexts.computeIfAbsent(classStatic.getName(),
name -> Mockito.mockStatic(classStatic, mockSettings));
consumer.accept(mockedStatic);
}
protected void mockWorkerUtils() {
mockWorkerUtils(null);
}
protected void mockWorkerUtils(Consumer<MockedStatic<WorkerUtils>> consumer) {
mockStatic(WorkerUtils.class, withSettings(), ctx -> {
// make dumpToTmpFile return the nar file copy
ctx.when(() -> WorkerUtils.dumpToTmpFile(mockedInputStream))
.thenAnswer(invocation -> {
Path tempFile = Files.createTempFile("test", ".nar");
Files.copy(getPulsarApiExamplesNar().toPath(), tempFile,
StandardCopyOption.REPLACE_EXISTING);
return tempFile.toFile();
});
ctx.when(() -> WorkerUtils.dumpToTmpFile(any()))
.thenAnswer(Answers.CALLS_REAL_METHODS);
if (consumer != null) {
consumer.accept(ctx);
}
});
}
protected void mockInstanceUtils() {
mockStatic(InstanceUtils.class, ctx -> {
ctx.when(() -> InstanceUtils.calculateSubjectType(any()))
.thenReturn(getComponentType());
});
}
protected abstract Function.FunctionDetails.ComponentType getComponentType();
public static class LoadedConnector extends Connector {
public LoadedConnector(ConnectorDefinition connectorDefinition) {
super(null, connectorDefinition, null, true);
}
@Override
public ValidatableFunctionPackage getConnectorFunctionPackage() {
return new LoadedFunctionPackage(getClass().getClassLoader(), ConnectorDefinition.class,
getConnectorDefinition());
}
}
protected void registerBuiltinConnector(String connectorType, String className) {
ConnectorDefinition connectorDefinition = null;
if (className != null) {
connectorDefinition = new ConnectorDefinition();
// set source and sink class to the same to simplify the test
connectorDefinition.setSinkClass(className);
connectorDefinition.setSourceClass(className);
}
connectorsManager.addConnector(connectorType, new LoadedConnector(connectorDefinition));
}
protected void registerBuiltinConnector(String connectorType, File packageFile) {
ConnectorDefinition cntDef;
try {
cntDef = ConnectorUtils.getConnectorDefinition(packageFile);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
connectorsManager.addConnector(connectorType,
new Connector(packageFile.toPath(), cntDef, NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, true));
}
public static class LoadedFunctionArchive extends FunctionArchive {
public LoadedFunctionArchive(FunctionDefinition functionDefinition) {
super(null, functionDefinition, null, true);
}
@Override
public ValidatableFunctionPackage getFunctionPackage() {
return new LoadedFunctionPackage(getClass().getClassLoader(), FunctionDefinition.class,
getFunctionDefinition());
}
}
protected void registerBuiltinFunction(String functionType, String className) {
FunctionDefinition functionDefinition = null;
if (className != null) {
functionDefinition = new FunctionDefinition();
functionDefinition.setFunctionClass(className);
}
functionsManager.addFunction(functionType, new LoadedFunctionArchive(functionDefinition));
}
protected void registerBuiltinFunction(String functionType, File packageFile) {
FunctionDefinition cntDef;
try {
cntDef = FunctionUtils.getFunctionDefinition(packageFile);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
functionsManager.addFunction(functionType,
new FunctionArchive(packageFile.toPath(), cntDef, NarClassLoader.DEFAULT_NAR_EXTRACTION_DIR, true));
}
}