blob: eef95a1a722150aac401ddfc05529018565fd643 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.worker;
import com.google.common.io.MoreFiles;
import com.google.common.io.RecursiveDeleteOption;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.functions.auth.FunctionAuthProvider;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.utils.Actions;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URL;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.common.functions.Utils.FILE;
import static org.apache.pulsar.common.functions.Utils.HTTP;
import static org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported;
import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
import static org.apache.pulsar.functions.utils.FunctionCommon.getSinkType;
import static org.apache.pulsar.functions.utils.FunctionCommon.getSourceType;
@Data
@Slf4j
public class FunctionActioner {
private final WorkerConfig workerConfig;
private final RuntimeFactory runtimeFactory;
private final Namespace dlogNamespace;
private final ConnectorsManager connectorsManager;
private final PulsarAdmin pulsarAdmin;
public FunctionActioner(WorkerConfig workerConfig,
RuntimeFactory runtimeFactory,
Namespace dlogNamespace,
ConnectorsManager connectorsManager, PulsarAdmin pulsarAdmin) {
this.workerConfig = workerConfig;
this.runtimeFactory = runtimeFactory;
this.dlogNamespace = dlogNamespace;
this.connectorsManager = connectorsManager;
this.pulsarAdmin = pulsarAdmin;
}
public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) {
try {
FunctionMetaData functionMetaData = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData();
FunctionDetails functionDetails = functionMetaData.getFunctionDetails();
int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
log.info("{}/{}/{}-{} Starting function ...", functionDetails.getTenant(), functionDetails.getNamespace(),
functionDetails.getName(), instanceId);
String packageFile;
String pkgLocation = functionMetaData.getPackageLocation().getPackagePath();
boolean isPkgUrlProvided = isFunctionPackageUrlSupported(pkgLocation);
if (runtimeFactory.externallyManaged()) {
packageFile = pkgLocation;
} else {
if (isPkgUrlProvided && pkgLocation.startsWith(FILE)) {
URL url = new URL(pkgLocation);
File pkgFile = new File(url.toURI());
packageFile = pkgFile.getAbsolutePath();
} else if (WorkerUtils.isFunctionCodeBuiltin(functionDetails)) {
File pkgFile = getBuiltinArchive(FunctionDetails.newBuilder(functionMetaData.getFunctionDetails()));
packageFile = pkgFile.getAbsolutePath();
} else {
File pkgDir = new File(workerConfig.getDownloadDirectory(),
getDownloadPackagePath(functionMetaData, instanceId));
pkgDir.mkdirs();
File pkgFile = new File(
pkgDir,
new File(getDownloadFileName(functionMetaData.getFunctionDetails(), functionMetaData.getPackageLocation())).getName());
downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId);
packageFile = pkgFile.getAbsolutePath();
}
}
RuntimeSpawner runtimeSpawner = getRuntimeSpawner(functionRuntimeInfo.getFunctionInstance(), packageFile);
functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
runtimeSpawner.start();
} catch (Exception ex) {
FunctionDetails details = functionRuntimeInfo.getFunctionInstance()
.getFunctionMetaData().getFunctionDetails();
log.info("{}/{}/{} Error starting function", details.getTenant(), details.getNamespace(),
details.getName(), ex);
functionRuntimeInfo.setStartupException(ex);
}
}
RuntimeSpawner getRuntimeSpawner(Function.Instance instance, String packageFile) {
FunctionMetaData functionMetaData = instance.getFunctionMetaData();
int instanceId = instance.getInstanceId();
FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(functionMetaData.getFunctionDetails());
// check to make sure functionAuthenticationSpec has any data and authentication is enabled.
// If not set to null, since for protobuf,
// even if the field is not set its not going to be null. Have to use the "has" method to check
Function.FunctionAuthenticationSpec functionAuthenticationSpec = null;
if (workerConfig.isAuthenticationEnabled() && instance.getFunctionMetaData().hasFunctionAuthSpec()) {
functionAuthenticationSpec = instance.getFunctionMetaData().getFunctionAuthSpec();
}
InstanceConfig instanceConfig = createInstanceConfig(functionDetailsBuilder.build(),
functionAuthenticationSpec,
instanceId, workerConfig.getPulsarFunctionsCluster());
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, packageFile,
functionMetaData.getPackageLocation().getOriginalFileName(),
runtimeFactory, workerConfig.getInstanceLivenessCheckFreqMs());
return runtimeSpawner;
}
InstanceConfig createInstanceConfig(FunctionDetails functionDetails, Function.FunctionAuthenticationSpec
functionAuthSpec, int instanceId, String clusterName) {
InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setFunctionDetails(functionDetails);
// TODO: set correct function id and version when features implemented
instanceConfig.setFunctionId(UUID.randomUUID().toString());
instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
instanceConfig.setInstanceId(instanceId);
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(FunctionCommon.findAvailablePort());
instanceConfig.setClusterName(clusterName);
instanceConfig.setFunctionAuthenticationSpec(functionAuthSpec);
instanceConfig.setMaxPendingAsyncRequests(workerConfig.getMaxPendingAsyncRequests());
return instanceConfig;
}
private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaData functionMetaData, int instanceId) throws FileNotFoundException, IOException {
FunctionDetails details = functionMetaData.getFunctionDetails();
File pkgDir = pkgFile.getParentFile();
if (pkgFile.exists()) {
log.warn("Function package exists already {} deleting it",
pkgFile);
pkgFile.delete();
}
File tempPkgFile;
while (true) {
tempPkgFile = new File(
pkgDir,
pkgFile.getName() + "." + instanceId + "." + UUID.randomUUID().toString());
if (!tempPkgFile.exists() && tempPkgFile.createNewFile()) {
break;
}
}
String pkgLocationPath = functionMetaData.getPackageLocation().getPackagePath();
boolean downloadFromHttp = isPkgUrlProvided && pkgLocationPath.startsWith(HTTP);
log.info("{}/{}/{} Function package file {} will be downloaded from {}", tempPkgFile, details.getTenant(),
details.getNamespace(), details.getName(),
downloadFromHttp ? pkgLocationPath : functionMetaData.getPackageLocation());
if(downloadFromHttp) {
FunctionCommon.downloadFromHttpUrl(pkgLocationPath, tempPkgFile);
} else {
FileOutputStream tempPkgFos = new FileOutputStream(tempPkgFile);
WorkerUtils.downloadFromBookkeeper(
dlogNamespace,
tempPkgFos,
pkgLocationPath);
if (tempPkgFos != null) {
tempPkgFos.close();
}
}
try {
// create a hardlink, if there are two concurrent createLink operations, one will fail.
// this ensures one instance will successfully download the package.
try {
Files.createLink(
Paths.get(pkgFile.toURI()),
Paths.get(tempPkgFile.toURI()));
log.info("Function package file is linked from {} to {}",
tempPkgFile, pkgFile);
} catch (FileAlreadyExistsException faee) {
// file already exists
log.warn("Function package has been downloaded from {} and saved at {}",
functionMetaData.getPackageLocation(), pkgFile);
}
} finally {
tempPkgFile.delete();
}
if(details.getRuntime() == Function.FunctionDetails.Runtime.GO && !pkgFile.canExecute()) {
pkgFile.setExecutable(true);
log.info("Golang function package file {} is set to executable", pkgFile);
}
}
private void cleanupFunctionFiles(FunctionRuntimeInfo functionRuntimeInfo) {
Function.Instance instance = functionRuntimeInfo.getFunctionInstance();
FunctionMetaData functionMetaData = instance.getFunctionMetaData();
// clean up function package
File pkgDir = new File(
workerConfig.getDownloadDirectory(),
getDownloadPackagePath(functionMetaData, instance.getInstanceId()));
if (pkgDir.exists()) {
try {
MoreFiles.deleteRecursively(
Paths.get(pkgDir.toURI()), RecursiveDeleteOption.ALLOW_INSECURE);
} catch (IOException e) {
log.warn("Failed to delete package for function: {}",
FunctionCommon.getFullyQualifiedName(functionMetaData.getFunctionDetails()), e);
}
}
}
public void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
Function.Instance instance = functionRuntimeInfo.getFunctionInstance();
FunctionMetaData functionMetaData = instance.getFunctionMetaData();
FunctionDetails details = functionMetaData.getFunctionDetails();
log.info("{}/{}/{}-{} Stopping function...", details.getTenant(), details.getNamespace(), details.getName(),
instance.getInstanceId());
if (functionRuntimeInfo.getRuntimeSpawner() != null) {
functionRuntimeInfo.getRuntimeSpawner().close();
functionRuntimeInfo.setRuntimeSpawner(null);
}
cleanupFunctionFiles(functionRuntimeInfo);
}
public void terminateFunction(FunctionRuntimeInfo functionRuntimeInfo) {
FunctionDetails details = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
String fqfn = FunctionCommon.getFullyQualifiedName(details);
log.info("{}-{} Terminating function...", fqfn,functionRuntimeInfo.getFunctionInstance().getInstanceId());
FunctionDetails funcDetails = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
if (functionRuntimeInfo.getRuntimeSpawner() != null) {
functionRuntimeInfo.getRuntimeSpawner().close();
// cleanup any auth data cached
if (workerConfig.isAuthenticationEnabled()) {
functionRuntimeInfo.getRuntimeSpawner()
.getRuntimeFactory().getAuthProvider().ifPresent(functionAuthProvider -> {
try {
log.info("{}-{} Cleaning up authentication data for function...", fqfn,functionRuntimeInfo.getFunctionInstance().getInstanceId());
functionAuthProvider
.cleanUpAuthData(
details,
Optional.ofNullable(getFunctionAuthData(
Optional.ofNullable(
functionRuntimeInfo.getRuntimeSpawner().getInstanceConfig().getFunctionAuthenticationSpec()))));
} catch (Exception e) {
log.error("Failed to cleanup auth data for function: {}", fqfn, e);
}
});
}
functionRuntimeInfo.setRuntimeSpawner(null);
}
cleanupFunctionFiles(functionRuntimeInfo);
//cleanup subscriptions
if (details.getSource().getCleanupSubscription()) {
Map<String, Function.ConsumerSpec> consumerSpecMap = details.getSource().getInputSpecsMap();
consumerSpecMap.entrySet().forEach(new Consumer<Map.Entry<String, Function.ConsumerSpec>>() {
@Override
public void accept(Map.Entry<String, Function.ConsumerSpec> stringConsumerSpecEntry) {
Function.ConsumerSpec consumerSpec = stringConsumerSpecEntry.getValue();
String topic = stringConsumerSpecEntry.getKey();
String subscriptionName = isBlank(functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails().getSource().getSubscriptionName())
? InstanceUtils.getDefaultSubscriptionName(functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails())
: functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails().getSource().getSubscriptionName();
try {
Actions.newBuilder()
.addAction(
Actions.Action.builder()
.actionName(String.format("Cleaning up subscriptions for function %s", fqfn))
.numRetries(10)
.sleepBetweenInvocationsMs(1000)
.supplier(() -> {
try {
if (consumerSpec.getIsRegexPattern()) {
pulsarAdmin.namespaces().unsubscribeNamespace(TopicName
.get(topic).getNamespace(), subscriptionName);
} else {
pulsarAdmin.topics().deleteSubscription(topic,
subscriptionName);
}
} catch (PulsarAdminException e) {
if (e instanceof PulsarAdminException.NotFoundException) {
return Actions.ActionResult.builder()
.success(true)
.build();
} else {
// for debugging purposes
List<Map<String, String>> existingConsumers = Collections.emptyList();
try {
TopicStats stats = pulsarAdmin.topics().getStats(topic);
SubscriptionStats sub = stats.subscriptions.get(subscriptionName);
if (sub != null) {
existingConsumers = sub.consumers.stream()
.map(consumerStats -> consumerStats.metadata)
.collect(Collectors.toList());
}
} catch (PulsarAdminException e1) {
}
String errorMsg = e.getHttpError() != null ? e.getHttpError() : e.getMessage();
return Actions.ActionResult.builder()
.success(false)
.errorMsg(String.format("%s - existing consumers: %s", errorMsg, existingConsumers))
.build();
}
}
return Actions.ActionResult.builder()
.success(true)
.build();
})
.build())
.run();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
}
}
private String getDownloadPackagePath(FunctionMetaData functionMetaData, int instanceId) {
return StringUtils.join(
new String[]{
functionMetaData.getFunctionDetails().getTenant(),
functionMetaData.getFunctionDetails().getNamespace(),
functionMetaData.getFunctionDetails().getName(),
Integer.toString(instanceId),
},
File.separatorChar);
}
private File getBuiltinArchive(FunctionDetails.Builder functionDetails) throws IOException, ClassNotFoundException {
if (functionDetails.hasSource()) {
SourceSpec sourceSpec = functionDetails.getSource();
if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
File archive = connectorsManager.getSourceArchive(sourceSpec.getBuiltin()).toFile();
String sourceClass = ConnectorUtils.getConnectorDefinition(archive.toString()).getSourceClass();
SourceSpec.Builder builder = SourceSpec.newBuilder(functionDetails.getSource());
builder.setClassName(sourceClass);
functionDetails.setSource(builder);
fillSourceTypeClass(functionDetails, archive, sourceClass);
return archive;
}
}
if (functionDetails.hasSink()) {
SinkSpec sinkSpec = functionDetails.getSink();
if (!StringUtils.isEmpty(sinkSpec.getBuiltin())) {
File archive = connectorsManager.getSinkArchive(sinkSpec.getBuiltin()).toFile();
String sinkClass = ConnectorUtils.getConnectorDefinition(archive.toString()).getSinkClass();
SinkSpec.Builder builder = SinkSpec.newBuilder(functionDetails.getSink());
builder.setClassName(sinkClass);
functionDetails.setSink(builder);
fillSinkTypeClass(functionDetails, archive, sinkClass);
return archive;
}
}
throw new IOException("Could not find built in archive definition");
}
private void fillSourceTypeClass(FunctionDetails.Builder functionDetails, File archive, String className)
throws IOException, ClassNotFoundException {
try (NarClassLoader ncl = NarClassLoader.getFromArchive(archive, Collections.emptySet())) {
String typeArg = getSourceType(className, ncl).getName();
SourceSpec.Builder sourceBuilder = SourceSpec.newBuilder(functionDetails.getSource());
sourceBuilder.setTypeClassName(typeArg);
functionDetails.setSource(sourceBuilder);
SinkSpec sinkSpec = functionDetails.getSink();
if (null == sinkSpec || StringUtils.isEmpty(sinkSpec.getTypeClassName())) {
SinkSpec.Builder sinkBuilder = SinkSpec.newBuilder(sinkSpec);
sinkBuilder.setTypeClassName(typeArg);
functionDetails.setSink(sinkBuilder);
}
}
}
private void fillSinkTypeClass(FunctionDetails.Builder functionDetails, File archive, String className)
throws IOException, ClassNotFoundException {
try (NarClassLoader ncl = NarClassLoader.getFromArchive(archive, Collections.emptySet())) {
String typeArg = getSinkType(className, ncl).getName();
SinkSpec.Builder sinkBuilder = SinkSpec.newBuilder(functionDetails.getSink());
sinkBuilder.setTypeClassName(typeArg);
functionDetails.setSink(sinkBuilder);
SourceSpec sourceSpec = functionDetails.getSource();
if (null == sourceSpec || StringUtils.isEmpty(sourceSpec.getTypeClassName())) {
SourceSpec.Builder sourceBuilder = SourceSpec.newBuilder(sourceSpec);
sourceBuilder.setTypeClassName(typeArg);
functionDetails.setSource(sourceBuilder);
}
}
}
private static String getDownloadFileName(FunctionDetails FunctionDetails,
Function.PackageLocationMetaData packageLocation) {
if (!org.apache.commons.lang.StringUtils.isEmpty(packageLocation.getOriginalFileName())) {
return packageLocation.getOriginalFileName();
}
String[] hierarchy = FunctionDetails.getClassName().split("\\.");
String fileName;
if (hierarchy.length <= 0) {
fileName = FunctionDetails.getClassName();
} else if (hierarchy.length == 1) {
fileName = hierarchy[0];
} else {
fileName = hierarchy[hierarchy.length - 2];
}
switch (FunctionDetails.getRuntime()) {
case JAVA:
return fileName + ".jar";
case PYTHON:
return fileName + ".py";
case GO:
return fileName + ".go";
default:
throw new RuntimeException("Unknown runtime " + FunctionDetails.getRuntime());
}
}
}