blob: e7c6dddc9b2cf9a97abd319bb6a0793acf3acec2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.worker;
import static org.testng.Assert.assertEquals;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.util.PortManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.worker.service.WorkerServiceLoader;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "functions-worker")
public class PulsarFunctionTlsTest {
protected static final int BROKER_COUNT = 2;
private static final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
private static final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
private static final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
private static final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
LocalBookkeeperEnsemble bkEnsemble;
protected PulsarAdmin[] pulsarAdmins = new PulsarAdmin[BROKER_COUNT];
protected ServiceConfiguration[] configurations = new ServiceConfiguration[BROKER_COUNT];
protected PulsarService[] pulsarServices = new PulsarService[BROKER_COUNT];
protected PulsarService leaderPulsar;
protected PulsarAdmin leaderAdmin;
protected WorkerService[] fnWorkerServices = new WorkerService[BROKER_COUNT];
protected String testCluster = "my-cluster";
protected String testTenant = "my-tenant";
protected String testNamespace = testTenant + "/my-ns";
private PulsarFunctionTestTemporaryDirectory[] tempDirectories = new PulsarFunctionTestTemporaryDirectory[BROKER_COUNT];
@BeforeMethod(alwaysRun = true)
void setup() throws Exception {
log.info("---- Initializing TopicOwnerTest -----");
// Start local bookkeeper ensemble
bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> 0);
bkEnsemble.start();
// start brokers
for (int i = 0; i < BROKER_COUNT; i++) {
int brokerPort = PortManager.nextFreePort();
int webPort = PortManager.nextFreePort();
ServiceConfiguration config = new ServiceConfiguration();
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setWebServicePort(Optional.empty());
config.setWebServicePortTls(Optional.of(webPort));
config.setBrokerServicePort(Optional.empty());
config.setBrokerServicePortTls(Optional.of(brokerPort));
config.setClusterName("my-cluster");
config.setAdvertisedAddress("localhost");
config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setDefaultNumberOfNamespaceBundles(1);
config.setLoadBalancerEnabled(false);
Set<String> superUsers = Sets.newHashSet("superUser", "admin");
config.setSuperUserRoles(superUsers);
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderTls.class.getName());
config.setAuthenticationEnabled(true);
config.setAuthorizationEnabled(true);
config.setAuthenticationProviders(providers);
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config.setTlsAllowInsecureConnection(true);
config.setBrokerClientTlsEnabled(true);
config.setBrokerClientTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH);
config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
config.setBrokerClientAuthenticationParameters(
"tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + ",tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
config.setFunctionsWorkerEnabled(true);
config.setTlsEnabled(true);
WorkerConfig workerConfig = PulsarService.initializeWorkerConfigFromBrokerConfig(config, null);
tempDirectories[i] = PulsarFunctionTestTemporaryDirectory.create(getClass().getSimpleName());
tempDirectories[i].useTemporaryDirectoriesForWorkerConfig(workerConfig);
workerConfig.setPulsarFunctionsNamespace("public/functions");
workerConfig.setSchedulerClassName(
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
workerConfig.setFunctionRuntimeFactoryConfigs(
ObjectMapperFactory.getThreadLocal().convertValue(
new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
workerConfig.setFailureCheckFreqMs(100);
workerConfig.setNumFunctionPackageReplicas(1);
workerConfig.setClusterCoordinationTopicName("coordinate");
workerConfig.setFunctionAssignmentTopicName("assignment");
workerConfig.setFunctionMetadataTopicName("metadata");
workerConfig.setInstanceLivenessCheckFreqMs(100);
workerConfig.setBrokerClientAuthenticationEnabled(true);
workerConfig.setTlsEnabled(true);
workerConfig.setUseTls(true);
fnWorkerServices[i] = WorkerServiceLoader.load(workerConfig);
configurations[i] = config;
pulsarServices[i] = new PulsarService(
config, workerConfig, Optional.of(fnWorkerServices[i]), code -> {});
pulsarServices[i].start();
// Sleep until pulsarServices[0] becomes leader, this way we can spy namespace bundle assignment easily.
while (i == 0 && !pulsarServices[0].getLeaderElectionService().isLeader()) {
Thread.sleep(10);
}
Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
Authentication authTls = new AuthenticationTls();
authTls.configure(authParams);
pulsarAdmins[i] = PulsarAdmin.builder()
.serviceHttpUrl(pulsarServices[i].getWebServiceAddressTls())
.tlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH)
.allowTlsInsecureConnection(true)
.authentication(authTls)
.build();
}
leaderPulsar = pulsarServices[0];
leaderAdmin = pulsarAdmins[0];
Thread.sleep(1000);
TenantInfo tenantInfo = TenantInfo.builder()
.allowedClusters(Collections.singleton(testCluster))
.build();
pulsarAdmins[0].tenants().createTenant(testTenant, tenantInfo);
pulsarAdmins[0].namespaces().createNamespace(testNamespace, 16);
}
@AfterMethod(alwaysRun = true)
void tearDown() throws Exception {
try {
for (int i = 0; i < BROKER_COUNT; i++) {
if (pulsarAdmins[i] != null) {
pulsarAdmins[i].close();
}
if (fnWorkerServices[i] != null) {
fnWorkerServices[i].stop();
}
if (pulsarServices[i] != null) {
pulsarServices[i].close();
}
}
bkEnsemble.stop();
} finally {
for (int i = 0; i < BROKER_COUNT; i++) {
if (tempDirectories[i] != null) {
tempDirectories[i].delete();
}
}
}
}
@Test
public void testFunctionsCreation() throws Exception {
String jarFilePathUrl = String.format("%s:%s", org.apache.pulsar.common.functions.Utils.FILE,
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath());
for (int i = 0; i < BROKER_COUNT; i++) {
String functionName = "function-" + i;
FunctionConfig functionConfig = createFunctionConfig(jarFilePathUrl, testTenant, "my-ns",
functionName, "my.*", "sink-topic-" + i, "sub-" + i);
log.info(" -------- Start test function : {}", functionName);
pulsarAdmins[i].functions().createFunctionWithUrl(
functionConfig, jarFilePathUrl
);
FunctionConfig config = pulsarAdmins[i].functions().getFunction(testTenant, "my-ns", functionName);
assertEquals(config.getTenant(), testTenant);
assertEquals(config.getNamespace(), "my-ns");
assertEquals(config.getName(), functionName);
pulsarAdmins[i].functions().deleteFunction(config.getTenant(), config.getNamespace(), config.getName());
}
}
protected static FunctionConfig createFunctionConfig(
String jarFile,
String tenant,
String namespace,
String functionName,
String sourceTopic,
String sinkTopic,
String subscriptionName
) throws JsonProcessingException {
File file = new File(jarFile);
try {
ClassLoader classLoader = ClassLoaderUtils.loadJar(file);
if (classLoader instanceof Closeable) {
((Closeable) classLoader).close();
}
} catch (IOException e) {
throw new UncheckedIOException("Failed to load user jar " + file, e);
}
String sourceTopicPattern = String.format("persistent://%s/%s/%s", tenant, namespace, sourceTopic);
FunctionConfig functionConfig = new FunctionConfig();
functionConfig.setTenant(tenant);
functionConfig.setNamespace(namespace);
functionConfig.setName(functionName);
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setParallelism(1);
functionConfig.setClassName(IdentityFunction.class.getName());
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
functionConfig.setSubName(subscriptionName);
functionConfig.setTopicsPattern(sourceTopicPattern);
functionConfig.setAutoAck(true);
functionConfig.setOutput(sinkTopic);
log.info("Function Config: {}", new ObjectMapper().writerWithDefaultPrettyPrinter()
.writeValueAsString(functionConfig));
return functionConfig;
}
}