blob: cb5291bed7bc3e59a4adc52080e640038c1c1453 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.functions;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
/**
* A cluster to run pulsar functions for testing functions related features.
*/
@Slf4j
public abstract class PulsarFunctionsTestBase extends PulsarTestSuite {
@DataProvider(name = "FunctionRuntimeTypes")
public static Object[][] getData() {
return new Object[][] {
{ FunctionRuntimeType.PROCESS },
{ FunctionRuntimeType.THREAD }
};
}
protected final FunctionRuntimeType functionRuntimeType;
public PulsarFunctionsTestBase() {
this(FunctionRuntimeType.PROCESS);
}
protected PulsarFunctionsTestBase(FunctionRuntimeType functionRuntimeType) {
this.functionRuntimeType = functionRuntimeType;
}
@BeforeClass
public void setupFunctionWorkers() {
final int numFunctionWorkers = 2;
log.info("Setting up {} function workers : function runtime type = {}",
numFunctionWorkers, functionRuntimeType);
pulsarCluster.setupFunctionWorkers(randomName(5), functionRuntimeType, numFunctionWorkers);
log.info("{} function workers has started", numFunctionWorkers);
}
@AfterClass
public void teardownFunctionWorkers() {
log.info("Tearing down function workers ...");
pulsarCluster.stopWorkers();
log.info("All functions workers are stopped.");
}
//
// Common Variables used by functions test
//
public static final String EXCLAMATION_JAVA_CLASS =
"org.apache.pulsar.functions.api.examples.ExclamationFunction";
public static final String PUBLISH_JAVA_CLASS =
"org.apache.pulsar.functions.api.examples.TypedMessageBuilderPublish";
public static final String EXCEPTION_JAVA_CLASS =
"org.apache.pulsar.tests.integration.functions.ExceptionFunction";
public static final String SERDE_JAVA_CLASS =
"org.apache.pulsar.functions.api.examples.CustomBaseToBaseFunction";
public static final String SERDE_OUTPUT_CLASS =
"org.apache.pulsar.functions.api.examples.CustomBaseSerde";
public static final String EXCLAMATION_PYTHON_CLASS =
"exclamation_function.ExclamationFunction";
public static final String EXCLAMATION_WITH_DEPS_PYTHON_CLASS =
"exclamation_with_extra_deps.ExclamationFunction";
public static final String EXCLAMATION_PYTHON_ZIP_CLASS =
"exclamation";
public static final String PUBLISH_PYTHON_CLASS = "typed_message_builder_publish.TypedMessageBuilderPublish";
public static final String EXCEPTION_PYTHON_CLASS = "exception_function";
public static final String EXCLAMATION_PYTHON_FILE = "exclamation_function.py";
public static final String EXCLAMATION_WITH_DEPS_PYTHON_FILE = "exclamation_with_extra_deps.py";
public static final String EXCLAMATION_PYTHON_ZIP_FILE = "exclamation.zip";
public static final String PUBLISH_FUNCTION_PYTHON_FILE = "typed_message_builder_publish.py";
public static final String EXCEPTION_FUNCTION_PYTHON_FILE = "exception_function.py";
protected static String getExclamationClass(Runtime runtime,
boolean pyZip,
boolean extraDeps) {
if (Runtime.JAVA == runtime) {
return EXCLAMATION_JAVA_CLASS;
} else if (Runtime.PYTHON == runtime) {
if (pyZip) {
return EXCLAMATION_PYTHON_ZIP_CLASS;
} else if (extraDeps) {
return EXCLAMATION_WITH_DEPS_PYTHON_CLASS;
} else {
return EXCLAMATION_PYTHON_CLASS;
}
} else {
throw new IllegalArgumentException("Unsupported runtime : " + runtime);
}
}
@DataProvider(name = "FunctionRuntimes")
public static Object[][] functionRuntimes() {
return new Object[][] {
new Object[] { Runtime.JAVA },
new Object[] { Runtime.PYTHON }
};
}
}