blob: 29fb11c618fba460c7a535368d55b62b13cda255 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.worker.rest.api.v2;
import static org.apache.pulsar.functions.utils.FunctionCommon.mergeJson;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.doNothing;
import static org.powermock.api.mockito.PowerMockito.doThrow;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.testng.Assert.assertEquals;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.config.Configurator;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.Functions;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.Function.SubscriptionType;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.LeaderService;
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImplV2;
import org.apache.pulsar.functions.worker.rest.api.PulsarFunctionTestTemporaryDirectory;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.Assert;
import org.testng.IObjectFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
/**
* Unit test of {@link FunctionsApiV2Resource}.
*/
@PrepareForTest({WorkerUtils.class, InstanceUtils.class})
@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.functions.api.*" })
public class FunctionApiV2ResourceTest {
@ObjectFactory
public IObjectFactory getObjectFactory() {
return new org.powermock.modules.testng.PowerMockObjectFactory();
}
private static final class TestFunction implements Function<String, String> {
@Override
public String process(String input, Context context) {
return input;
}
}
private static final String tenant = "test-tenant";
private static final String namespace = "test-namespace";
private static final String function = "test-function";
private static final String outputTopic = "test-output-topic";
private static final String outputSerdeClassName = TopicSchema.DEFAULT_SERDE;
private static final String className = TestFunction.class.getName();
private SubscriptionType subscriptionType = SubscriptionType.FAILOVER;
private static final Map<String, String> topicsToSerDeClassName = new HashMap<>();
static {
topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", TopicSchema.DEFAULT_SERDE);
}
private static final int parallelism = 1;
private PulsarWorkerService mockedWorkerService;
private PulsarAdmin mockedPulsarAdmin;
private Tenants mockedTenants;
private Namespaces mockedNamespaces;
private Functions mockedFunctions;
private TenantInfo mockedTenantInfo;
private List<String> namespaceList = new LinkedList<>();
private FunctionMetaDataManager mockedManager;
private FunctionRuntimeManager mockedFunctionRunTimeManager;
private RuntimeFactory mockedRuntimeFactory;
private Namespace mockedNamespace;
private FunctionsImplV2 resource;
private InputStream mockedInputStream;
private FormDataContentDisposition mockedFormData;
private FunctionMetaData mockedFunctionMetadata;
private LeaderService mockedLeaderService;
private PulsarFunctionTestTemporaryDirectory tempDirectory;
@BeforeMethod
public void setup() throws Exception {
this.mockedManager = mock(FunctionMetaDataManager.class);
this.mockedFunctionRunTimeManager = mock(FunctionRuntimeManager.class);
this.mockedTenantInfo = mock(TenantInfo.class);
this.mockedRuntimeFactory = mock(RuntimeFactory.class);
this.mockedInputStream = mock(InputStream.class);
this.mockedNamespace = mock(Namespace.class);
this.mockedFormData = mock(FormDataContentDisposition.class);
when(mockedFormData.getFileName()).thenReturn("test");
this.mockedPulsarAdmin = mock(PulsarAdmin.class);
this.mockedTenants = mock(Tenants.class);
this.mockedNamespaces = mock(Namespaces.class);
this.mockedFunctions = mock(Functions.class);
this.mockedLeaderService = mock(LeaderService.class);
this.mockedFunctionMetadata = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
namespaceList.add(tenant + "/" + namespace);
this.mockedWorkerService = mock(PulsarWorkerService.class);
when(mockedWorkerService.getFunctionMetaDataManager()).thenReturn(mockedManager);
when(mockedWorkerService.getLeaderService()).thenReturn(mockedLeaderService);
when(mockedWorkerService.getFunctionRuntimeManager()).thenReturn(mockedFunctionRunTimeManager);
when(mockedFunctionRunTimeManager.getRuntimeFactory()).thenReturn(mockedRuntimeFactory);
when(mockedWorkerService.getDlogNamespace()).thenReturn(mockedNamespace);
when(mockedWorkerService.isInitialized()).thenReturn(true);
when(mockedWorkerService.getBrokerAdmin()).thenReturn(mockedPulsarAdmin);
when(mockedWorkerService.getFunctionAdmin()).thenReturn(mockedPulsarAdmin);
when(mockedPulsarAdmin.tenants()).thenReturn(mockedTenants);
when(mockedPulsarAdmin.namespaces()).thenReturn(mockedNamespaces);
when(mockedPulsarAdmin.functions()).thenReturn(mockedFunctions);
when(mockedTenants.getTenantInfo(any())).thenReturn(mockedTenantInfo);
when(mockedNamespaces.getNamespaces(any())).thenReturn(namespaceList);
when(mockedLeaderService.isLeader()).thenReturn(true);
when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetadata);
// worker config
WorkerConfig workerConfig = new WorkerConfig()
.setWorkerId("test")
.setWorkerPort(8080)
.setFunctionMetadataTopicName("pulsar/functions")
.setNumFunctionPackageReplicas(3)
.setPulsarServiceUrl("pulsar://localhost:6650/");
tempDirectory = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
tempDirectory.useTemporaryDirectoriesForWorkerConfig(workerConfig);
when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
FunctionsImpl functions = spy(new FunctionsImpl(() -> mockedWorkerService));
mockStatic(InstanceUtils.class);
PowerMockito.when(InstanceUtils.calculateSubjectType(any())).thenReturn(FunctionDetails.ComponentType.FUNCTION);
this.resource = spy(new FunctionsImplV2(functions));
}
@AfterMethod(alwaysRun = true)
public void cleanup() {
if (tempDirectory != null) {
tempDirectory.delete();
}
}
//
// Register Functions
//
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
public void testRegisterFunctionMissingTenant() {
try {
testRegisterFunctionMissingArguments(
null,
namespace,
function,
mockedInputStream,
topicsToSerDeClassName,
mockedFormData,
outputTopic,
outputSerdeClassName,
className,
parallelism,
null);
} catch (RestException re){
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
public void testRegisterFunctionMissingNamespace() {
try {
testRegisterFunctionMissingArguments(
tenant,
null,
function,
mockedInputStream,
topicsToSerDeClassName,
mockedFormData,
outputTopic,
outputSerdeClassName,
className,
parallelism,
null);
} catch (RestException re){
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function name is not provided")
public void testRegisterFunctionMissingFunctionName() {
try {
testRegisterFunctionMissingArguments(
tenant,
namespace,
null,
mockedInputStream,
topicsToSerDeClassName,
mockedFormData,
outputTopic,
outputSerdeClassName,
className,
parallelism,
null);
} catch (RestException re){
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Package is not provided")
public void testRegisterFunctionMissingPackage() {
try {
testRegisterFunctionMissingArguments(
tenant,
namespace,
function,
null,
topicsToSerDeClassName,
mockedFormData,
outputTopic,
outputSerdeClassName,
className,
parallelism,
null);
} catch (RestException re){
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "No input topic\\(s\\) specified for the function")
public void testRegisterFunctionMissingInputTopics() throws Exception {
try {
testRegisterFunctionMissingArguments(
tenant,
namespace,
function,
mockedInputStream,
null,
mockedFormData,
outputTopic,
outputSerdeClassName,
className,
parallelism,
null);
} catch (RestException re){
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Package is not provided")
public void testRegisterFunctionMissingPackageDetails() {
try {
testRegisterFunctionMissingArguments(
tenant,
namespace,
function,
mockedInputStream,
topicsToSerDeClassName,
null,
outputTopic,
outputSerdeClassName,
className,
parallelism,
null);
} catch (RestException re){
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function classname cannot be null")
public void testRegisterFunctionMissingClassName() {
try {
testRegisterFunctionMissingArguments(
tenant,
namespace,
function,
mockedInputStream,
topicsToSerDeClassName,
mockedFormData,
outputTopic,
outputSerdeClassName,
null,
parallelism,
null);
} catch (RestException re){
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function class UnknownClass must be in class path")
public void testRegisterFunctionWrongClassName() {
try {
testRegisterFunctionMissingArguments(
tenant,
namespace,
function,
mockedInputStream,
topicsToSerDeClassName,
mockedFormData,
outputTopic,
outputSerdeClassName,
"UnknownClass",
parallelism,
null);
} catch (RestException re){
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function parallelism must be a positive number")
public void testRegisterFunctionWrongParallelism() {
try {
testRegisterFunctionMissingArguments(
tenant,
namespace,
function,
mockedInputStream,
topicsToSerDeClassName,
mockedFormData,
outputTopic,
outputSerdeClassName,
className,
-2,
null);
} catch (RestException re){
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class,
expectedExceptionsMessageRegExp = "Output topic persistent://sample/standalone/ns1/test_src is also being used as an input topic \\(topics must be one or the other\\)")
public void testRegisterFunctionSameInputOutput() {
try {
testRegisterFunctionMissingArguments(
tenant,
namespace,
function,
mockedInputStream,
topicsToSerDeClassName,
mockedFormData,
topicsToSerDeClassName.keySet().iterator().next(),
outputSerdeClassName,
className,
parallelism,
null);
} catch (RestException re){
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Output topic " + function + "-output-topic/test:" + " is invalid")
public void testRegisterFunctionWrongOutputTopic() {
try {
testRegisterFunctionMissingArguments(
tenant,
namespace,
function,
mockedInputStream,
topicsToSerDeClassName,
mockedFormData,
function + "-output-topic/test:",
outputSerdeClassName,
className,
parallelism,
null);
} catch (RestException re){
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Encountered error .*. when getting Function package from .*")
public void testRegisterFunctionHttpUrl() {
try {
testRegisterFunctionMissingArguments(
tenant,
namespace,
function,
null,
topicsToSerDeClassName,
null,
outputTopic,
outputSerdeClassName,
className,
parallelism,
"http://localhost:1234/test");
} catch (RestException re){
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
private void testRegisterFunctionMissingArguments(
String tenant,
String namespace,
String function,
InputStream inputStream,
Map<String, String> topicsToSerDeClassName,
FormDataContentDisposition details,
String outputTopic,
String outputSerdeClassName,
String className,
Integer parallelism,
String functionPkgUrl) {
FunctionConfig functionConfig = new FunctionConfig();
if (tenant != null) {
functionConfig.setTenant(tenant);
}
if (namespace != null) {
functionConfig.setNamespace(namespace);
}
if (function != null) {
functionConfig.setName(function);
}
if (topicsToSerDeClassName != null) {
functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
}
if (outputTopic != null) {
functionConfig.setOutput(outputTopic);
}
if (outputSerdeClassName != null) {
functionConfig.setOutputSerdeClassName(outputSerdeClassName);
}
if (className != null) {
functionConfig.setClassName(className);
}
if (parallelism != null) {
functionConfig.setParallelism(parallelism);
}
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
mockStatic(WorkerUtils.class);
try {
WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
} catch (Exception e) {
throw new RuntimeException(e);
}
try {
resource.registerFunction(
tenant,
namespace,
function,
inputStream,
details,
functionPkgUrl,
JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, null)),
null);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
private void registerDefaultFunction() {
FunctionConfig functionConfig = createDefaultFunctionConfig();
try {
resource.registerFunction(
tenant,
namespace,
function,
mockedInputStream,
mockedFormData,
null,
JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, null)),
null);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function test-function already exists")
public void testRegisterExistedFunction() {
try {
Configurator.setRootLevel(Level.DEBUG);
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
registerDefaultFunction();
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
public void testRegisterFunctionUploadFailure() throws Exception {
try {
mockStatic(WorkerUtils.class);
doThrow(new IOException("upload failure")).when(WorkerUtils.class);
WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
registerDefaultFunction();
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
throw re;
}
}
@Test
public void testRegisterFunctionSuccess() throws Exception {
try {
mockStatic(WorkerUtils.class);
doNothing().when(WorkerUtils.class);
WorkerUtils.uploadToBookeeper(
any(Namespace.class),
any(InputStream.class),
anyString());
PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
registerDefaultFunction();
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace does not exist")
public void testRegisterFunctionNonExistingNamespace() {
try {
this.namespaceList.clear();
registerDefaultFunction();
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant does not exist")
public void testRegisterFunctionNonexistantTenant() throws Exception {
try {
when(mockedTenants.getTenantInfo(any())).thenThrow(PulsarAdminException.NotFoundException.class);
registerDefaultFunction();
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "function failed to register")
public void testRegisterFunctionFailure() throws Exception {
try {
mockStatic(WorkerUtils.class);
doNothing().when(WorkerUtils.class);
WorkerUtils.uploadToBookeeper(
any(Namespace.class),
any(InputStream.class),
anyString());
PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
doThrow(new IllegalArgumentException("function failed to register"))
.when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), anyBoolean());
registerDefaultFunction();
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function registration interrupted")
public void testRegisterFunctionInterrupted() throws Exception {
try {
mockStatic(WorkerUtils.class);
doNothing().when(WorkerUtils.class);
WorkerUtils.uploadToBookeeper(
any(Namespace.class),
any(InputStream.class),
anyString());
PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
doThrow(new IllegalStateException("Function registration interrupted"))
.when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), anyBoolean());
registerDefaultFunction();
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
throw re;
}
}
//
// Update Functions
//
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
public void testUpdateFunctionMissingTenant() throws Exception {
try {
testUpdateFunctionMissingArguments(
null,
namespace,
function,
mockedInputStream,
topicsToSerDeClassName,
mockedFormData,
outputTopic,
outputSerdeClassName,
className,
parallelism,
"Tenant is not provided");
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
public void testUpdateFunctionMissingNamespace() throws Exception {
try {
testUpdateFunctionMissingArguments(
tenant,
null,
function,
mockedInputStream,
topicsToSerDeClassName,
mockedFormData,
outputTopic,
outputSerdeClassName,
className,
parallelism,
"Namespace is not provided");
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function name is not provided")
public void testUpdateFunctionMissingFunctionName() throws Exception {
try {
testUpdateFunctionMissingArguments(
tenant,
namespace,
null,
mockedInputStream,
topicsToSerDeClassName,
mockedFormData,
outputTopic,
outputSerdeClassName,
className,
parallelism,
"Function name is not provided");
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
public void testUpdateFunctionMissingPackage() throws Exception {
try {
mockStatic(WorkerUtils.class);
doNothing().when(WorkerUtils.class);
WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
testUpdateFunctionMissingArguments(
tenant,
namespace,
function,
null,
topicsToSerDeClassName,
mockedFormData,
outputTopic,
outputSerdeClassName,
className,
parallelism,
"Update contains no change");
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
public void testUpdateFunctionMissingInputTopic() throws Exception {
try {
mockStatic(WorkerUtils.class);
doNothing().when(WorkerUtils.class);
WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
testUpdateFunctionMissingArguments(
tenant,
namespace,
function,
null,
null,
mockedFormData,
outputTopic,
outputSerdeClassName,
className,
parallelism,
"Update contains no change");
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
public void testUpdateFunctionMissingClassName() throws Exception {
try {
mockStatic(WorkerUtils.class);
doNothing().when(WorkerUtils.class);
WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
testUpdateFunctionMissingArguments(
tenant,
namespace,
function,
null,
topicsToSerDeClassName,
mockedFormData,
outputTopic,
outputSerdeClassName,
null,
parallelism,
"Update contains no change");
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test
public void testUpdateFunctionChangedParallelism() throws Exception {
try {
mockStatic(WorkerUtils.class);
doNothing().when(WorkerUtils.class);
WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
testUpdateFunctionMissingArguments(
tenant,
namespace,
function,
null,
topicsToSerDeClassName,
mockedFormData,
outputTopic,
outputSerdeClassName,
null,
parallelism + 1,
null);
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test
public void testUpdateFunctionChangedInputs() throws Exception {
mockStatic(WorkerUtils.class);
doNothing().when(WorkerUtils.class);
WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
testUpdateFunctionMissingArguments(
tenant,
namespace,
function,
null,
topicsToSerDeClassName,
mockedFormData,
"DifferentOutput",
outputSerdeClassName,
null,
parallelism,
null);
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Input Topics cannot be altered")
public void testUpdateFunctionChangedOutput() throws Exception {
try {
mockStatic(WorkerUtils.class);
doNothing().when(WorkerUtils.class);
WorkerUtils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
Map<String, String> someOtherInput = new HashMap<>();
someOtherInput.put("DifferentTopic", TopicSchema.DEFAULT_SERDE);
testUpdateFunctionMissingArguments(
tenant,
namespace,
function,
null,
someOtherInput,
mockedFormData,
outputTopic,
outputSerdeClassName,
null,
parallelism,
"Input Topics cannot be altered");
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
private void testUpdateFunctionMissingArguments(
String tenant,
String namespace,
String function,
InputStream inputStream,
Map<String, String> topicsToSerDeClassName,
FormDataContentDisposition details,
String outputTopic,
String outputSerdeClassName,
String className,
Integer parallelism,
String expectedError) throws Exception {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
FunctionConfig functionConfig = new FunctionConfig();
if (tenant != null) {
functionConfig.setTenant(tenant);
}
if (namespace != null) {
functionConfig.setNamespace(namespace);
}
if (function != null) {
functionConfig.setName(function);
}
if (topicsToSerDeClassName != null) {
functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
}
if (outputTopic != null) {
functionConfig.setOutput(outputTopic);
}
if (outputSerdeClassName != null) {
functionConfig.setOutputSerdeClassName(outputSerdeClassName);
}
if (className != null) {
functionConfig.setClassName(className);
}
if (parallelism != null) {
functionConfig.setParallelism(parallelism);
}
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
if (expectedError != null) {
doThrow(new IllegalArgumentException(expectedError))
.when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), anyBoolean());
}
try {
resource.updateFunction(
tenant,
namespace,
function,
inputStream,
details,
null,
JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, null)),
null);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
private void updateDefaultFunction() {
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespace);
functionConfig.setName(function);
functionConfig.setClassName(className);
functionConfig.setParallelism(parallelism);
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
functionConfig.setOutput(outputTopic);
functionConfig.setOutputSerdeClassName(outputSerdeClassName);
try {
resource.updateFunction(
tenant,
namespace,
function,
mockedInputStream,
mockedFormData,
null,
JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, null)),
null);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function test-function doesn't exist")
public void testUpdateNotExistedFunction() {
try {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
updateDefaultFunction();
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
public void testUpdateFunctionUploadFailure() throws Exception {
try {
mockStatic(WorkerUtils.class);
doThrow(new IOException("upload failure")).when(WorkerUtils.class);
WorkerUtils.uploadFileToBookkeeper(
anyString(),
any(File.class),
any(Namespace.class));
PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
updateDefaultFunction();
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
throw re;
}
}
@Test
public void testUpdateFunctionSuccess() throws Exception {
mockStatic(WorkerUtils.class);
doNothing().when(WorkerUtils.class);
WorkerUtils.uploadToBookeeper(
any(Namespace.class),
any(InputStream.class),
anyString());
PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
updateDefaultFunction();
}
@Test
public void testUpdateFunctionWithUrl() throws Exception {
Configurator.setRootLevel(Level.DEBUG);
URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
File file = Paths.get(fileUrl.toURI()).toFile();
String fileLocation = file.getAbsolutePath();
String filePackageUrl = "file://" + fileLocation;
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setOutput(outputTopic);
functionConfig.setOutputSerdeClassName(outputSerdeClassName);
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespace);
functionConfig.setName(function);
functionConfig.setClassName(className);
functionConfig.setParallelism(parallelism);
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
try {
resource.updateFunction(
tenant,
namespace,
function,
null,
null,
filePackageUrl,
JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, null)),
null);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "function failed to register")
public void testUpdateFunctionFailure() throws Exception {
try {
mockStatic(WorkerUtils.class);
doNothing().when(WorkerUtils.class);
WorkerUtils.uploadToBookeeper(
any(Namespace.class),
any(InputStream.class),
anyString());
PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
doThrow(new IllegalArgumentException("function failed to register"))
.when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), anyBoolean());
updateDefaultFunction();
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function registeration interrupted")
public void testUpdateFunctionInterrupted() throws Exception {
try {
mockStatic(WorkerUtils.class);
doNothing().when(WorkerUtils.class);
WorkerUtils.uploadToBookeeper(
any(Namespace.class),
any(InputStream.class),
anyString());
PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
doThrow(new IllegalStateException("Function registeration interrupted"))
.when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), anyBoolean());
updateDefaultFunction();
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
throw re;
}
}
//
// deregister function
//
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
public void testDeregisterFunctionMissingTenant() {
try {
testDeregisterFunctionMissingArguments(
null,
namespace,
function
);
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
public void testDeregisterFunctionMissingNamespace() {
try {
testDeregisterFunctionMissingArguments(
tenant,
null,
function
);
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function name is not provided")
public void testDeregisterFunctionMissingFunctionName() {
try {
testDeregisterFunctionMissingArguments(
tenant,
namespace,
null
);
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
private void testDeregisterFunctionMissingArguments(
String tenant,
String namespace,
String function
) {
resource.deregisterFunction(
tenant,
namespace,
function,
null);
}
private void deregisterDefaultFunction() {
resource.deregisterFunction(
tenant,
namespace,
function,
null);
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function test-function doesn't exist")
public void testDeregisterNotExistedFunction() {
try {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
deregisterDefaultFunction();
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.NOT_FOUND);
throw re;
}
}
@Test
public void testDeregisterFunctionSuccess() {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
deregisterDefaultFunction();
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "function failed to deregister")
public void testDeregisterFunctionFailure() throws Exception {
try {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
doThrow(new IllegalArgumentException("function failed to deregister"))
.when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), anyBoolean());
deregisterDefaultFunction();
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function deregisteration interrupted")
public void testDeregisterFunctionInterrupted() throws Exception {
try {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
doThrow(new IllegalStateException("Function deregisteration interrupted"))
.when(mockedManager).updateFunctionOnLeader(any(FunctionMetaData.class), anyBoolean());
deregisterDefaultFunction();
}
catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
throw re;
}
}
//
// Get Function Info
//
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
public void testGetFunctionMissingTenant() throws IOException {
try {
testGetFunctionMissingArguments(
null,
namespace,
function
);
}
catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
public void testGetFunctionMissingNamespace() throws IOException {
try {
testGetFunctionMissingArguments(
tenant,
null,
function
);
}
catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function name is not provided")
public void testGetFunctionMissingFunctionName() throws IOException {
try {
testGetFunctionMissingArguments(
tenant,
namespace,
null
);
}
catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
private void testGetFunctionMissingArguments(
String tenant,
String namespace,
String function
) throws IOException {
resource.getFunctionInfo(
tenant,
namespace,
function, null
);
}
private FunctionDetails getDefaultFunctionInfo() throws IOException {
String json = (String) resource.getFunctionInfo(
tenant,
namespace,
function, null
).getEntity();
FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
mergeJson(json, functionDetailsBuilder);
return functionDetailsBuilder.build();
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function test-function doesn't exist")
public void testGetNotExistedFunction() throws IOException {
try {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
getDefaultFunctionInfo();
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.NOT_FOUND);
throw re;
}
}
@Test
public void testGetFunctionSuccess() throws IOException {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
SinkSpec sinkSpec = SinkSpec.newBuilder()
.setTopic(outputTopic)
.setSerDeClassName(outputSerdeClassName).build();
FunctionDetails functionDetails = FunctionDetails.newBuilder()
.setClassName(className)
.setSink(sinkSpec)
.setName(function)
.setNamespace(namespace)
.setProcessingGuarantees(ProcessingGuarantees.ATMOST_ONCE)
.setTenant(tenant)
.setParallelism(parallelism)
.setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)
.putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build();
FunctionMetaData metaData = FunctionMetaData.newBuilder()
.setCreateTime(System.currentTimeMillis())
.setFunctionDetails(functionDetails)
.setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("/path/to/package"))
.setVersion(1234)
.build();
when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData);
FunctionDetails actual = getDefaultFunctionInfo();
assertEquals(
functionDetails,
actual);
}
//
// List Functions
//
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
public void testListFunctionsMissingTenant() {
try {
testListFunctionsMissingArguments(
null,
namespace
);
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
public void testListFunctionsMissingNamespace() {
try {
testListFunctionsMissingArguments(
tenant,
null
);
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
}
}
private void testListFunctionsMissingArguments(
String tenant,
String namespace
) {
resource.listFunctions(
tenant,
namespace, null
);
}
private List<String> listDefaultFunctions() {
return new Gson().fromJson((String) resource.listFunctions(
tenant,
namespace, null
).getEntity(), List.class);
}
@Test
public void testListFunctionsSuccess() {
final List<String> functions = Lists.newArrayList("test-1", "test-2");
final List<FunctionMetaData> metaDataList = new LinkedList<>();
FunctionMetaData functionMetaData1 = FunctionMetaData.newBuilder().setFunctionDetails(
FunctionDetails.newBuilder().setName("test-1").build()
).build();
FunctionMetaData functionMetaData2 = FunctionMetaData.newBuilder().setFunctionDetails(
FunctionDetails.newBuilder().setName("test-2").build()
).build();
metaDataList.add(functionMetaData1);
metaDataList.add(functionMetaData2);
when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(metaDataList);
List<String> functionList = listDefaultFunctions();
assertEquals(functions, functionList);
}
@Test
public void testDownloadFunctionHttpUrl() throws Exception {
String jarHttpUrl = "https://repo1.maven.org/maven2/org/apache/pulsar/pulsar-common/2.4.2/pulsar-common-2.4.2.jar";
String testDir = FunctionApiV2ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
FunctionsImplV2 function = new FunctionsImplV2(() -> mockedWorkerService);
StreamingOutput streamOutput = (StreamingOutput) function.downloadFunction(jarHttpUrl, null).getEntity();
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
Assert.assertTrue(pkgFile.exists());
if (pkgFile.exists()) {
pkgFile.delete();
}
}
@Test
public void testDownloadFunctionFile() throws Exception {
URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
File file = Paths.get(fileUrl.toURI()).toFile();
String fileLocation = file.getAbsolutePath();
String testDir = FunctionApiV2ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
FunctionsImplV2 function = new FunctionsImplV2(() -> mockedWorkerService);
StreamingOutput streamOutput = (StreamingOutput) function.downloadFunction("file://" + fileLocation, null).getEntity();
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
Assert.assertTrue(pkgFile.exists());
if (pkgFile.exists()) {
pkgFile.delete();
}
}
@Test
public void testRegisterFunctionFileUrlWithValidSinkClass() throws Exception {
Configurator.setRootLevel(Level.DEBUG);
URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
File file = Paths.get(fileUrl.toURI()).toFile();
String fileLocation = file.getAbsolutePath();
String filePackageUrl = "file://" + fileLocation;
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespace);
functionConfig.setName(function);
functionConfig.setClassName(className);
functionConfig.setParallelism(parallelism);
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
functionConfig.setOutput(outputTopic);
functionConfig.setOutputSerdeClassName(outputSerdeClassName);
try {
resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl,
JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, null)), null);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
@Test
public void testRegisterFunctionWithConflictingFields() throws Exception {
Configurator.setRootLevel(Level.DEBUG);
String actualTenant = "DIFFERENT_TENANT";
String actualNamespace = "DIFFERENT_NAMESPACE";
String actualName = "DIFFERENT_NAME";
this.namespaceList.add(actualTenant + "/" + actualNamespace);
URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml");
File file = Paths.get(fileUrl.toURI()).toFile();
String fileLocation = file.getAbsolutePath();
String filePackageUrl = "file://" + fileLocation;
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false);
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespace);
functionConfig.setName(function);
functionConfig.setClassName(className);
functionConfig.setParallelism(parallelism);
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
functionConfig.setOutput(outputTopic);
functionConfig.setOutputSerdeClassName(outputSerdeClassName);
try {
resource.registerFunction(actualTenant, actualNamespace, actualName, null, null, filePackageUrl,
JsonFormat.printer().print(FunctionConfigUtils.convert(functionConfig, null)), null);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
public static FunctionConfig createDefaultFunctionConfig() {
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespace);
functionConfig.setName(function);
functionConfig.setClassName(className);
functionConfig.setParallelism(parallelism);
functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
functionConfig.setOutput(outputTopic);
functionConfig.setOutputSerdeClassName(outputSerdeClassName);
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
return functionConfig;
}
public static FunctionDetails createDefaultFunctionDetails() {
FunctionConfig functionConfig = createDefaultFunctionConfig();
return FunctionConfigUtils.convert(functionConfig, null);
}
}