| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.functions.worker.rest.api; |
| |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| import static java.util.concurrent.TimeUnit.SECONDS; |
| import static org.apache.commons.lang3.StringUtils.isEmpty; |
| import static org.apache.commons.lang3.StringUtils.isNotBlank; |
| import static org.apache.pulsar.functions.utils.FunctionCommon.createPkgTempFile; |
| import static org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageName; |
| import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException; |
| import com.google.common.base.Utf8; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.net.URI; |
| import java.net.URL; |
| import java.nio.ByteBuffer; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.nio.file.Paths; |
| import java.util.Base64; |
| import java.util.Collection; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Objects; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Supplier; |
| import javax.ws.rs.WebApplicationException; |
| import javax.ws.rs.core.Response; |
| import javax.ws.rs.core.Response.Status; |
| import javax.ws.rs.core.StreamingOutput; |
| import javax.ws.rs.core.UriBuilder; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.commons.io.IOUtils; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.pulsar.broker.authentication.AuthenticationDataSource; |
| import org.apache.pulsar.broker.authentication.AuthenticationParameters; |
| import org.apache.pulsar.client.admin.PulsarAdminException; |
| import org.apache.pulsar.client.admin.internal.FunctionsImpl; |
| import org.apache.pulsar.client.api.Message; |
| import org.apache.pulsar.client.api.MessageId; |
| import org.apache.pulsar.client.api.Producer; |
| import org.apache.pulsar.client.api.Reader; |
| import org.apache.pulsar.client.api.Schema; |
| import org.apache.pulsar.client.api.SchemaSerializationException; |
| import org.apache.pulsar.common.functions.FunctionConfig; |
| import org.apache.pulsar.common.functions.FunctionState; |
| import org.apache.pulsar.common.functions.Utils; |
| import org.apache.pulsar.common.functions.WorkerInfo; |
| import org.apache.pulsar.common.io.ConnectorDefinition; |
| import org.apache.pulsar.common.naming.NamespaceName; |
| import org.apache.pulsar.common.naming.TopicName; |
| import org.apache.pulsar.common.policies.data.FunctionInstanceStatsDataImpl; |
| import org.apache.pulsar.common.policies.data.FunctionStatsImpl; |
| import org.apache.pulsar.common.util.Codec; |
| import org.apache.pulsar.common.util.RestException; |
| import org.apache.pulsar.functions.instance.InstanceUtils; |
| import org.apache.pulsar.functions.instance.state.DefaultStateStore; |
| import org.apache.pulsar.functions.proto.Function; |
| import org.apache.pulsar.functions.proto.Function.FunctionDetails; |
| import org.apache.pulsar.functions.proto.Function.FunctionMetaData; |
| import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData; |
| import org.apache.pulsar.functions.proto.Function.SinkSpec; |
| import org.apache.pulsar.functions.proto.Function.SourceSpec; |
| import org.apache.pulsar.functions.proto.InstanceCommunication; |
| import org.apache.pulsar.functions.runtime.RuntimeSpawner; |
| import org.apache.pulsar.functions.utils.ComponentTypeUtils; |
| import org.apache.pulsar.functions.utils.FunctionCommon; |
| import org.apache.pulsar.functions.utils.FunctionConfigUtils; |
| import org.apache.pulsar.functions.utils.FunctionMetaDataUtils; |
| import org.apache.pulsar.functions.utils.functions.FunctionArchive; |
| import org.apache.pulsar.functions.utils.io.Connector; |
| import org.apache.pulsar.functions.worker.FunctionMetaDataManager; |
| import org.apache.pulsar.functions.worker.FunctionRuntimeInfo; |
| import org.apache.pulsar.functions.worker.FunctionRuntimeManager; |
| import org.apache.pulsar.functions.worker.PulsarWorkerService; |
| import org.apache.pulsar.functions.worker.WorkerService; |
| import org.apache.pulsar.functions.worker.WorkerUtils; |
| import org.apache.pulsar.functions.worker.service.api.Component; |
| import org.apache.pulsar.packages.management.core.common.PackageMetadata; |
| import org.apache.pulsar.packages.management.core.common.PackageName; |
| import org.glassfish.jersey.media.multipart.FormDataContentDisposition; |
| |
| @Slf4j |
| public abstract class ComponentImpl implements Component<PulsarWorkerService> { |
| |
| protected final Supplier<PulsarWorkerService> workerServiceSupplier; |
| protected final Function.FunctionDetails.ComponentType componentType; |
| |
| public ComponentImpl(Supplier<PulsarWorkerService> workerServiceSupplier, |
| Function.FunctionDetails.ComponentType componentType) { |
| this.workerServiceSupplier = workerServiceSupplier; |
| this.componentType = componentType; |
| } |
| |
| protected abstract class GetStatus<X, T> { |
| |
| public abstract T notScheduledInstance(); |
| |
| public abstract T fromFunctionStatusProto(InstanceCommunication.FunctionStatus status, |
| String assignedWorkerId); |
| |
| public abstract T notRunning(String assignedWorkerId, String error); |
| |
| public T getComponentInstanceStatus(final String tenant, |
| final String namespace, |
| final String name, |
| final int instanceId, |
| final URI uri) { |
| |
| Function.Assignment assignment; |
| if (worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) { |
| assignment = worker().getFunctionRuntimeManager().findFunctionAssignment(tenant, namespace, name, -1); |
| } else { |
| assignment = worker().getFunctionRuntimeManager() |
| .findFunctionAssignment(tenant, namespace, name, instanceId); |
| } |
| |
| if (assignment == null) { |
| return notScheduledInstance(); |
| } |
| |
| final String assignedWorkerId = assignment.getWorkerId(); |
| final String workerId = worker().getWorkerConfig().getWorkerId(); |
| |
| // If I am running worker |
| if (assignedWorkerId.equals(workerId)) { |
| FunctionRuntimeInfo functionRuntimeInfo = worker().getFunctionRuntimeManager().getFunctionRuntimeInfo( |
| FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance())); |
| if (functionRuntimeInfo == null) { |
| return notRunning(assignedWorkerId, ""); |
| } |
| RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner(); |
| |
| if (runtimeSpawner != null) { |
| try { |
| return fromFunctionStatusProto( |
| functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus(instanceId).get(), |
| assignedWorkerId); |
| } catch (InterruptedException | ExecutionException e) { |
| throw new RuntimeException(e); |
| } |
| } else { |
| String message = functionRuntimeInfo.getStartupException() != null |
| ? functionRuntimeInfo.getStartupException().getMessage() : ""; |
| return notRunning(assignedWorkerId, message); |
| } |
| } else { |
| // query other worker |
| |
| List<WorkerInfo> workerInfoList = worker().getMembershipManager().getCurrentMembership(); |
| WorkerInfo workerInfo = null; |
| for (WorkerInfo entry : workerInfoList) { |
| if (assignment.getWorkerId().equals(entry.getWorkerId())) { |
| workerInfo = entry; |
| } |
| } |
| if (workerInfo == null) { |
| return notScheduledInstance(); |
| } |
| |
| if (uri == null) { |
| throw new WebApplicationException( |
| Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build()); |
| } else { |
| URI redirect = |
| UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()) |
| .build(); |
| throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); |
| } |
| } |
| } |
| |
| public abstract X getStatus(String tenant, |
| String namespace, |
| String name, |
| Collection<Function.Assignment> assignments, |
| URI uri) throws PulsarAdminException; |
| |
| public abstract X getStatusExternal(String tenant, |
| String namespace, |
| String name, |
| int parallelism); |
| |
| public abstract X emptyStatus(int parallelism); |
| |
| public X getComponentStatus(final String tenant, |
| final String namespace, |
| final String name, |
| final URI uri) { |
| |
| Function.FunctionMetaData functionMetaData = |
| worker().getFunctionMetaDataManager().getFunctionMetaData(tenant, namespace, name); |
| |
| Collection<Function.Assignment> assignments = |
| worker().getFunctionRuntimeManager().findFunctionAssignments(tenant, namespace, name); |
| |
| // TODO refactor the code for externally managed. |
| if (worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) { |
| Function.Assignment assignment = assignments.iterator().next(); |
| boolean isOwner = worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId()); |
| if (isOwner) { |
| return getStatusExternal(tenant, namespace, name, |
| functionMetaData.getFunctionDetails().getParallelism()); |
| } else { |
| |
| // find the hostname/port of the worker who is the owner |
| |
| List<WorkerInfo> workerInfoList = worker().getMembershipManager().getCurrentMembership(); |
| WorkerInfo workerInfo = null; |
| for (WorkerInfo entry : workerInfoList) { |
| if (assignment.getWorkerId().equals(entry.getWorkerId())) { |
| workerInfo = entry; |
| } |
| } |
| if (workerInfo == null) { |
| return emptyStatus(functionMetaData.getFunctionDetails().getParallelism()); |
| } |
| |
| if (uri == null) { |
| throw new WebApplicationException( |
| Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build()); |
| } else { |
| URI redirect = |
| UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()) |
| .build(); |
| throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); |
| } |
| } |
| } else { |
| try { |
| return getStatus(tenant, namespace, name, assignments, uri); |
| } catch (PulsarAdminException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public PulsarWorkerService worker() { |
| try { |
| return Objects.requireNonNull(workerServiceSupplier.get()); |
| } catch (Throwable t) { |
| log.info("Failed to get worker service", t); |
| throw t; |
| } |
| } |
| |
| boolean isWorkerServiceAvailable() { |
| WorkerService workerService = workerServiceSupplier.get(); |
| if (workerService == null) { |
| return false; |
| } |
| return workerService.isInitialized(); |
| } |
| |
| PackageLocationMetaData.Builder getFunctionPackageLocation(final FunctionMetaData functionMetaData, |
| final String functionPkgUrl, |
| final FormDataContentDisposition fileDetail, |
| final File uploadedInputStreamAsFile) |
| throws Exception { |
| return getFunctionPackageLocation(functionMetaData, functionPkgUrl, fileDetail, uploadedInputStreamAsFile, |
| functionMetaData.getFunctionDetails().getName(), componentType, |
| getFunctionCodeBuiltin(functionMetaData.getFunctionDetails(), componentType)); |
| } |
| |
| PackageLocationMetaData.Builder getFunctionPackageLocation(final FunctionMetaData functionMetaData, |
| final String functionPkgUrl, |
| final FormDataContentDisposition fileDetail, |
| final File uploadedInputStreamAsFile, |
| final String componentName, |
| final FunctionDetails.ComponentType componentType, |
| final String builtin) |
| throws Exception { |
| FunctionDetails functionDetails = functionMetaData.getFunctionDetails(); |
| String tenant = functionDetails.getTenant(); |
| String namespace = functionDetails.getNamespace(); |
| PackageLocationMetaData.Builder packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder(); |
| boolean isPkgUrlProvided = isNotBlank(functionPkgUrl); |
| boolean isPackageManagementEnabled = worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement(); |
| PackageName packageName = PackageName.get( |
| componentType.name(), |
| tenant, |
| namespace, |
| componentName, |
| String.valueOf(functionMetaData.getVersion())); |
| PackageMetadata metadata = PackageMetadata.builder() |
| .createTime(functionMetaData.getCreateTime()).build(); |
| if (worker().getFunctionRuntimeManager().getRuntimeFactory().externallyManaged()) { |
| // For externally managed schedulers, the pkgUrl/builtin stuff can be copied to bk |
| // if the function worker image does not include connectors |
| if (!isEmpty(builtin)) { |
| File component; |
| switch (componentType) { |
| case SOURCE: |
| component = worker().getConnectorsManager().getSourceArchive(builtin).toFile(); |
| break; |
| case SINK: |
| component = worker().getConnectorsManager().getSinkArchive(builtin).toFile(); |
| break; |
| default: |
| component = worker().getFunctionsManager().getFunctionArchive(builtin).toFile(); |
| break; |
| } |
| packageLocationMetaDataBuilder.setOriginalFileName(component.getName()); |
| if (worker().getWorkerConfig().getUploadBuiltinSinksSources()) { |
| packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName, |
| component.getName())); |
| if (isPackageManagementEnabled) { |
| packageLocationMetaDataBuilder.setPackagePath(packageName.toString()); |
| worker().getBrokerAdmin().packages().upload(metadata, |
| packageName.toString(), component.getAbsolutePath()); |
| } else { |
| packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, |
| componentName, component.getName())); |
| WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), |
| component, worker().getDlogNamespace()); |
| } |
| log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType), |
| packageLocationMetaDataBuilder.getPackagePath()); |
| } else { |
| log.info("Skipping upload for the built-in package {}", ComponentTypeUtils.toString(componentType)); |
| packageLocationMetaDataBuilder.setPackagePath("builtin://" + builtin); |
| } |
| } else if (isPkgUrlProvided) { |
| packageLocationMetaDataBuilder.setOriginalFileName(uploadedInputStreamAsFile.getName()); |
| if (isPackageManagementEnabled) { |
| packageLocationMetaDataBuilder.setPackagePath(functionPkgUrl); |
| if (!Utils.hasPackageTypePrefix(functionPkgUrl)) { |
| packageLocationMetaDataBuilder.setPackagePath(packageName.toString()); |
| worker().getBrokerAdmin().packages().upload(metadata, |
| packageName.toString(), uploadedInputStreamAsFile.getAbsolutePath()); |
| } |
| } else { |
| packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName, |
| uploadedInputStreamAsFile.getName())); |
| WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), |
| uploadedInputStreamAsFile, worker().getDlogNamespace()); |
| } |
| log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType), |
| packageLocationMetaDataBuilder.getPackagePath()); |
| } else if (functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.HTTP) |
| || functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.FILE)) { |
| String fileName = |
| new File(new URL(functionMetaData.getPackageLocation().getPackagePath()).toURI()).getName(); |
| packageLocationMetaDataBuilder.setOriginalFileName(fileName); |
| if (isPackageManagementEnabled) { |
| packageLocationMetaDataBuilder.setPackagePath(packageName.toString()); |
| worker().getBrokerAdmin().packages().upload(metadata, |
| packageName.toString(), uploadedInputStreamAsFile.getAbsolutePath()); |
| } else { |
| packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName, |
| fileName)); |
| WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), |
| uploadedInputStreamAsFile, worker().getDlogNamespace()); |
| } |
| log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType), |
| packageLocationMetaDataBuilder.getPackagePath()); |
| } else { |
| packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName()); |
| if (isPackageManagementEnabled) { |
| packageLocationMetaDataBuilder.setPackagePath(packageName.toString()); |
| worker().getBrokerAdmin().packages().upload(metadata, |
| packageName.toString(), uploadedInputStreamAsFile.getAbsolutePath()); |
| } else { |
| packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName, |
| fileDetail.getFileName())); |
| WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), |
| uploadedInputStreamAsFile, worker().getDlogNamespace()); |
| } |
| log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType), |
| packageLocationMetaDataBuilder.getPackagePath()); |
| } |
| } else { |
| // For pulsar managed schedulers, the pkgUrl/builtin stuff should be copied to bk |
| if (!isEmpty(builtin)) { |
| packageLocationMetaDataBuilder.setPackagePath("builtin://" + builtin); |
| } else if (isPkgUrlProvided) { |
| packageLocationMetaDataBuilder.setPackagePath(functionPkgUrl); |
| } else if (functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.HTTP) |
| || functionMetaData.getPackageLocation().getPackagePath().startsWith(Utils.FILE)) { |
| packageLocationMetaDataBuilder.setPackagePath(functionMetaData.getPackageLocation().getPackagePath()); |
| } else { |
| packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName()); |
| if (isPackageManagementEnabled) { |
| packageLocationMetaDataBuilder.setPackagePath(packageName.toString()); |
| worker().getBrokerAdmin().packages().upload(metadata, |
| packageName.toString(), uploadedInputStreamAsFile.getAbsolutePath()); |
| } else { |
| packageLocationMetaDataBuilder.setPackagePath(createPackagePath(tenant, namespace, componentName, |
| fileDetail.getFileName())); |
| WorkerUtils.uploadFileToBookkeeper(packageLocationMetaDataBuilder.getPackagePath(), |
| uploadedInputStreamAsFile, worker().getDlogNamespace()); |
| } |
| log.info("Uploading {} package to {}", ComponentTypeUtils.toString(componentType), |
| packageLocationMetaDataBuilder.getPackagePath()); |
| } |
| } |
| return packageLocationMetaDataBuilder; |
| } |
| |
| @Override |
| public void deregisterFunction(final String tenant, |
| final String namespace, |
| final String componentName, |
| final AuthenticationParameters authParams) { |
| |
| if (!isWorkerServiceAvailable()) { |
| throwUnavailableException(); |
| } |
| |
| throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, componentName, "deregister", |
| authParams); |
| |
| // validate parameters |
| try { |
| validateDeregisterRequestParams(tenant, namespace, componentName, componentType); |
| } catch (IllegalArgumentException e) { |
| log.error("Invalid deregister {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, |
| namespace, componentName, e); |
| throw new RestException(Status.BAD_REQUEST, e.getMessage()); |
| } |
| |
| FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); |
| if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { |
| log.error("{} to deregister does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, |
| namespace, componentName); |
| throw new RestException(Status.NOT_FOUND, |
| String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName)); |
| } |
| FunctionMetaData functionMetaData = |
| functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); |
| |
| if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) { |
| log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, |
| ComponentTypeUtils.toString(componentType)); |
| throw new RestException(Status.NOT_FOUND, |
| String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName)); |
| } |
| |
| FunctionMetaData newVersionedMetaData = |
| FunctionMetaDataUtils.incrMetadataVersion(functionMetaData, functionMetaData); |
| internalProcessFunctionRequest(newVersionedMetaData.getFunctionDetails().getTenant(), |
| newVersionedMetaData.getFunctionDetails().getNamespace(), |
| newVersionedMetaData.getFunctionDetails().getName(), |
| newVersionedMetaData, true, |
| String.format("Error deleting %s @ /%s/%s/%s", |
| ComponentTypeUtils.toString(componentType), tenant, namespace, componentName)); |
| |
| deleteComponentFromStorage(tenant, namespace, componentName, |
| functionMetaData.getPackageLocation().getPackagePath()); |
| |
| if (!isEmpty(functionMetaData.getTransformFunctionPackageLocation().getPackagePath())) { |
| deleteComponentFromStorage(tenant, namespace, componentName, |
| functionMetaData.getTransformFunctionPackageLocation().getPackagePath()); |
| } |
| |
| if (worker().getStateStoreProvider() != null) { |
| try { |
| worker().getStateStoreProvider().cleanUp(tenant, namespace, componentName); |
| } catch (Throwable e) { |
| log.error("failed to clean up the state store for {}/{}/{}", tenant, namespace, componentName); |
| } |
| } |
| } |
| |
| private void deleteComponentFromStorage(String tenant, String namespace, String componentName, |
| String functionPackagePath) { |
| if (!functionPackagePath.startsWith(Utils.HTTP) |
| && !functionPackagePath.startsWith(Utils.FILE) |
| && !functionPackagePath.startsWith(Utils.BUILTIN) |
| && !worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) { |
| try { |
| WorkerUtils.deleteFromBookkeeper(worker().getDlogNamespace(), functionPackagePath); |
| } catch (IOException e) { |
| log.error("{}/{}/{} Failed to cleanup package in BK with path {}", tenant, namespace, componentName, |
| functionPackagePath, e); |
| } |
| |
| } |
| } |
| |
| @Override |
| public FunctionConfig getFunctionInfo(final String tenant, |
| final String namespace, |
| final String componentName, |
| final AuthenticationParameters authParams) { |
| |
| if (!isWorkerServiceAvailable()) { |
| throwUnavailableException(); |
| } |
| |
| throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, componentName, "get", |
| authParams); |
| |
| // validate parameters |
| try { |
| validateGetFunctionRequestParams(tenant, namespace, componentName, componentType); |
| } catch (IllegalArgumentException e) { |
| log.error("Invalid get {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, |
| namespace, componentName, e); |
| throw new RestException(Status.BAD_REQUEST, e.getMessage()); |
| } |
| |
| FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); |
| if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { |
| log.error("{} does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, |
| componentName); |
| throw new RestException(Status.NOT_FOUND, |
| String.format(ComponentTypeUtils.toString(componentType) + " %s doesn't exist", componentName)); |
| } |
| FunctionMetaData functionMetaData = |
| functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); |
| if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) { |
| log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, |
| ComponentTypeUtils.toString(componentType)); |
| throw new RestException(Status.NOT_FOUND, |
| String.format(ComponentTypeUtils.toString(componentType) + " %s doesn't exist", componentName)); |
| } |
| return FunctionConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails()); |
| } |
| |
| @Override |
| public void stopFunctionInstance(final String tenant, |
| final String namespace, |
| final String componentName, |
| final String instanceId, |
| final URI uri, |
| final AuthenticationParameters authParams) { |
| changeFunctionInstanceStatus(tenant, namespace, componentName, instanceId, false, uri, authParams); |
| } |
| |
| @Override |
| public void startFunctionInstance(final String tenant, |
| final String namespace, |
| final String componentName, |
| final String instanceId, |
| final URI uri, |
| final AuthenticationParameters authParams) { |
| changeFunctionInstanceStatus(tenant, namespace, componentName, instanceId, true, uri, authParams); |
| } |
| |
| @Deprecated |
| public void changeFunctionInstanceStatus(final String tenant, |
| final String namespace, |
| final String componentName, |
| final String instanceId, |
| final boolean start, |
| final URI uri, |
| final String clientRole, |
| final AuthenticationDataSource clientAuthenticationDataHttps) { |
| AuthenticationParameters authParams = AuthenticationParameters.builder() |
| .clientRole(clientRole) |
| .clientAuthenticationDataSource(clientAuthenticationDataHttps) |
| .build(); |
| changeFunctionInstanceStatus(tenant, namespace, componentName, instanceId, start, uri, authParams); |
| } |
| |
| public void changeFunctionInstanceStatus(final String tenant, |
| final String namespace, |
| final String componentName, |
| final String instanceId, |
| final boolean start, |
| final URI uri, |
| final AuthenticationParameters authParams) { |
| |
| if (!isWorkerServiceAvailable()) { |
| throwUnavailableException(); |
| } |
| |
| throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, componentName, "start/stop", |
| authParams); |
| |
| // validate parameters |
| try { |
| validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId); |
| } catch (IllegalArgumentException e) { |
| log.error("Invalid start/stop {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, |
| namespace, componentName, e); |
| throw new RestException(Status.BAD_REQUEST, e.getMessage()); |
| } |
| |
| FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); |
| if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { |
| log.error("{} does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, |
| componentName); |
| throw new RestException(Status.NOT_FOUND, |
| String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName)); |
| } |
| |
| FunctionMetaData functionMetaData = |
| functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); |
| if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) { |
| log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, |
| ComponentTypeUtils.toString(componentType)); |
| throw new RestException(Status.NOT_FOUND, |
| String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName)); |
| } |
| |
| if (!FunctionMetaDataUtils.canChangeState(functionMetaData, Integer.parseInt(instanceId), |
| start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) { |
| log.error("Operation not permitted on {}/{}/{}", tenant, namespace, componentName); |
| throw new RestException(Status.BAD_REQUEST, "Operation not permitted"); |
| } |
| |
| FunctionMetaData newFunctionMetaData = FunctionMetaDataUtils |
| .changeFunctionInstanceStatus(functionMetaData, Integer.parseInt(instanceId), start); |
| internalProcessFunctionRequest(tenant, namespace, componentName, newFunctionMetaData, false, |
| String.format("Failed to start/stop %s: %s/%s/%s/%s", ComponentTypeUtils.toString(componentType), |
| tenant, namespace, componentName, instanceId)); |
| } |
| |
| @Override |
| public void restartFunctionInstance(final String tenant, |
| final String namespace, |
| final String componentName, |
| final String instanceId, |
| final URI uri, |
| final AuthenticationParameters authParams) { |
| if (!isWorkerServiceAvailable()) { |
| throwUnavailableException(); |
| } |
| |
| throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, componentName, "restart", |
| authParams); |
| |
| // validate parameters |
| try { |
| validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId); |
| } catch (IllegalArgumentException e) { |
| log.error("Invalid restart {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, |
| namespace, componentName, e); |
| throw new RestException(Status.BAD_REQUEST, e.getMessage()); |
| } |
| |
| FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); |
| if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { |
| log.error("{} does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, |
| componentName); |
| throw new RestException(Status.NOT_FOUND, |
| String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName)); |
| } |
| |
| FunctionMetaData functionMetaData = |
| functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); |
| if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) { |
| log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, |
| ComponentTypeUtils.toString(componentType)); |
| throw new RestException(Status.NOT_FOUND, |
| String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName)); |
| } |
| |
| FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); |
| try { |
| functionRuntimeManager.restartFunctionInstance(tenant, namespace, componentName, |
| Integer.parseInt(instanceId), uri); |
| } catch (WebApplicationException we) { |
| throw we; |
| } catch (Exception e) { |
| log.error("Failed to restart {}: {}/{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, |
| namespace, componentName, instanceId, e); |
| throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); |
| } |
| } |
| |
| @Override |
| public void stopFunctionInstances(final String tenant, |
| final String namespace, |
| final String componentName, |
| final AuthenticationParameters authParams) { |
| changeFunctionStatusAllInstances(tenant, namespace, componentName, false, authParams); |
| } |
| |
| @Override |
| public void startFunctionInstances(final String tenant, |
| final String namespace, |
| final String componentName, |
| final AuthenticationParameters authParams) { |
| changeFunctionStatusAllInstances(tenant, namespace, componentName, true, authParams); |
| } |
| |
| @Deprecated |
| public void changeFunctionStatusAllInstances(final String tenant, |
| final String namespace, |
| final String componentName, |
| final boolean start, |
| final String clientRole, |
| final AuthenticationDataSource clientAuthenticationDataHttps) { |
| AuthenticationParameters authParams = AuthenticationParameters.builder().clientRole(clientRole) |
| .clientAuthenticationDataSource(clientAuthenticationDataHttps).build(); |
| changeFunctionStatusAllInstances(tenant, namespace, componentName, start, authParams); |
| } |
| |
| public void changeFunctionStatusAllInstances(final String tenant, |
| final String namespace, |
| final String componentName, |
| final boolean start, |
| final AuthenticationParameters authParams) { |
| if (!isWorkerServiceAvailable()) { |
| throwUnavailableException(); |
| } |
| |
| throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, componentName, "start/stop", |
| authParams); |
| |
| // validate parameters |
| try { |
| validateGetFunctionRequestParams(tenant, namespace, componentName, componentType); |
| } catch (IllegalArgumentException e) { |
| log.error("Invalid start/stop {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, |
| namespace, componentName, e); |
| throw new RestException(Status.BAD_REQUEST, e.getMessage()); |
| } |
| |
| FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); |
| if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { |
| log.warn("{} in stopFunctionInstances does not exist @ /{}/{}/{}", |
| ComponentTypeUtils.toString(componentType), tenant, namespace, componentName); |
| throw new RestException(Status.NOT_FOUND, |
| String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName)); |
| } |
| |
| FunctionMetaData functionMetaData = |
| functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); |
| if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) { |
| log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, |
| ComponentTypeUtils.toString(componentType)); |
| throw new RestException(Status.NOT_FOUND, |
| String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName)); |
| } |
| |
| if (!FunctionMetaDataUtils.canChangeState(functionMetaData, -1, |
| start ? Function.FunctionState.RUNNING : Function.FunctionState.STOPPED)) { |
| log.error("Operation not permitted on {}/{}/{}", tenant, namespace, componentName); |
| throw new RestException(Status.BAD_REQUEST, "Operation not permitted"); |
| } |
| |
| FunctionMetaData newFunctionMetaData = |
| FunctionMetaDataUtils.changeFunctionInstanceStatus(functionMetaData, -1, start); |
| internalProcessFunctionRequest(tenant, namespace, componentName, newFunctionMetaData, false, |
| String.format("Failed to start/stop %s: %s/%s/%s", ComponentTypeUtils.toString(componentType), tenant, |
| namespace, componentName)); |
| } |
| |
| @Override |
| public void restartFunctionInstances(final String tenant, |
| final String namespace, |
| final String componentName, |
| final AuthenticationParameters authParams) { |
| if (!isWorkerServiceAvailable()) { |
| throwUnavailableException(); |
| } |
| |
| throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, componentName, "restart", |
| authParams); |
| |
| // validate parameters |
| try { |
| validateGetFunctionRequestParams(tenant, namespace, componentName, componentType); |
| } catch (IllegalArgumentException e) { |
| log.error("Invalid restart {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, |
| namespace, componentName, e); |
| throw new RestException(Status.BAD_REQUEST, e.getMessage()); |
| } |
| |
| FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); |
| if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { |
| log.warn("{} in stopFunctionInstances does not exist @ /{}/{}/{}", |
| ComponentTypeUtils.toString(componentType), tenant, namespace, componentName); |
| throw new RestException(Status.NOT_FOUND, |
| String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName)); |
| } |
| |
| FunctionMetaData functionMetaData = |
| functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); |
| if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) { |
| log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, |
| ComponentTypeUtils.toString(componentType)); |
| throw new RestException(Status.NOT_FOUND, |
| String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName)); |
| } |
| |
| FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); |
| try { |
| functionRuntimeManager.restartFunctionInstances(tenant, namespace, componentName); |
| } catch (WebApplicationException we) { |
| throw we; |
| } catch (Exception e) { |
| log.error("Failed to restart {}: {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, |
| componentName, e); |
| throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); |
| } |
| } |
| |
| @Override |
| public FunctionStatsImpl getFunctionStats(final String tenant, |
| final String namespace, |
| final String componentName, |
| final URI uri, |
| final AuthenticationParameters authParams) { |
| if (!isWorkerServiceAvailable()) { |
| throwUnavailableException(); |
| } |
| |
| throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, componentName, "get stats for", |
| authParams); |
| |
| // validate parameters |
| try { |
| validateGetFunctionRequestParams(tenant, namespace, componentName, componentType); |
| } catch (IllegalArgumentException e) { |
| log.error("Invalid get {} Stats request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, |
| namespace, componentName, e); |
| throw new RestException(Status.BAD_REQUEST, e.getMessage()); |
| } |
| |
| FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); |
| if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { |
| log.warn("{} in get {} Stats does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), |
| componentType, tenant, namespace, componentName); |
| throw new RestException(Status.NOT_FOUND, |
| String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName)); |
| } |
| |
| FunctionMetaData functionMetaData = |
| functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); |
| if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) { |
| log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, |
| ComponentTypeUtils.toString(componentType)); |
| throw new RestException(Status.NOT_FOUND, |
| String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName)); |
| } |
| |
| FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); |
| FunctionStatsImpl functionStats; |
| try { |
| functionStats = functionRuntimeManager.getFunctionStats(tenant, namespace, componentName, uri); |
| } catch (WebApplicationException we) { |
| throw we; |
| } catch (Exception e) { |
| log.error("{}/{}/{} Got Exception Getting Stats", tenant, namespace, componentName, e); |
| throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); |
| } |
| |
| return functionStats; |
| } |
| |
| @Override |
| public FunctionInstanceStatsDataImpl |
| getFunctionsInstanceStats(final String tenant, |
| final String namespace, |
| final String componentName, |
| final String instanceId, |
| final URI uri, |
| final AuthenticationParameters authParams) { |
| if (!isWorkerServiceAvailable()) { |
| throwUnavailableException(); |
| } |
| |
| throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, componentName, "get stats for", |
| authParams); |
| |
| // validate parameters |
| try { |
| validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId); |
| } catch (IllegalArgumentException e) { |
| log.error("Invalid get {} Stats request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, |
| namespace, componentName, e); |
| throw new RestException(Status.BAD_REQUEST, e.getMessage()); |
| |
| } |
| |
| FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); |
| if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { |
| log.warn("{} in get {} Stats does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), |
| componentType, tenant, namespace, componentName); |
| throw new RestException(Status.NOT_FOUND, |
| String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName)); |
| } |
| FunctionMetaData functionMetaData = |
| functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); |
| if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) { |
| log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, |
| ComponentTypeUtils.toString(componentType)); |
| throw new RestException(Status.NOT_FOUND, |
| String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName)); |
| |
| } |
| int instanceIdInt = Integer.parseInt(instanceId); |
| if (instanceIdInt < 0 || instanceIdInt >= functionMetaData.getFunctionDetails().getParallelism()) { |
| log.error("instanceId in get {} Stats out of bounds @ /{}/{}/{}", |
| ComponentTypeUtils.toString(componentType), tenant, namespace, componentName); |
| throw new RestException(Status.BAD_REQUEST, |
| String.format("%s %s doesn't have instance with id %s", ComponentTypeUtils.toString(componentType), |
| componentName, instanceId)); |
| } |
| |
| FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); |
| FunctionInstanceStatsDataImpl functionInstanceStatsData; |
| try { |
| functionInstanceStatsData = |
| functionRuntimeManager.getFunctionInstanceStats(tenant, namespace, componentName, |
| Integer.parseInt(instanceId), uri); |
| } catch (WebApplicationException we) { |
| throw we; |
| } catch (Exception e) { |
| log.error("{}/{}/{} Got Exception Getting Stats", tenant, namespace, componentName, e); |
| throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); |
| } |
| |
| return functionInstanceStatsData; |
| } |
| |
| @Override |
| public List<String> listFunctions(final String tenant, |
| final String namespace, |
| final AuthenticationParameters authParams) { |
| |
| if (!isWorkerServiceAvailable()) { |
| throwUnavailableException(); |
| } |
| |
| throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, null, "list", authParams); |
| |
| // validate parameters |
| try { |
| validateListFunctionRequestParams(tenant, namespace); |
| } catch (IllegalArgumentException e) { |
| log.error("Invalid list {} request @ /{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, |
| e); |
| throw new RestException(Status.BAD_REQUEST, e.getMessage()); |
| } |
| |
| FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); |
| |
| Collection<FunctionMetaData> functionStateList = functionMetaDataManager.listFunctions(tenant, namespace); |
| List<String> retVals = new LinkedList<>(); |
| for (FunctionMetaData functionMetaData : functionStateList) { |
| if (InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) { |
| retVals.add(functionMetaData.getFunctionDetails().getName()); |
| } |
| } |
| return retVals; |
| } |
| |
| void updateRequest(FunctionMetaData existingFunctionMetaData, final FunctionMetaData functionMetaData) { |
| FunctionMetaData updatedVersionMetaData = |
| FunctionMetaDataUtils.incrMetadataVersion(existingFunctionMetaData, functionMetaData); |
| internalProcessFunctionRequest(updatedVersionMetaData.getFunctionDetails().getTenant(), |
| updatedVersionMetaData.getFunctionDetails().getNamespace(), |
| updatedVersionMetaData.getFunctionDetails().getName(), |
| updatedVersionMetaData, false, "Update Failed"); |
| } |
| |
| @Override |
| public List<ConnectorDefinition> getListOfConnectors() { |
| if (!isWorkerServiceAvailable()) { |
| throwUnavailableException(); |
| } |
| |
| return this.worker().getConnectorsManager().getConnectorDefinitions(); |
| } |
| |
| @Override |
| public void reloadConnectors(AuthenticationParameters authParams) { |
| if (!isWorkerServiceAvailable()) { |
| throwUnavailableException(); |
| } |
| if (worker().getWorkerConfig().isAuthorizationEnabled()) { |
| // Only superuser has permission to do this operation. |
| if (!isSuperUser(authParams)) { |
| throw new RestException(Status.UNAUTHORIZED, "This operation requires super-user access"); |
| } |
| } |
| try { |
| this.worker().getConnectorsManager().reloadConnectors(worker().getWorkerConfig()); |
| } catch (IOException e) { |
| throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); |
| } |
| } |
| |
| @Override |
| public String triggerFunction(final String tenant, |
| final String namespace, |
| final String functionName, |
| final String input, |
| final InputStream uploadedInputStream, |
| final String topic, |
| final AuthenticationParameters authParams) { |
| |
| if (!isWorkerServiceAvailable()) { |
| throwUnavailableException(); |
| } |
| |
| throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, functionName, "trigger", authParams); |
| |
| // validate parameters |
| try { |
| validateTriggerRequestParams(tenant, namespace, functionName, topic, input, uploadedInputStream); |
| } catch (IllegalArgumentException e) { |
| log.error("Invalid trigger function request @ /{}/{}/{}", tenant, namespace, functionName, e); |
| throw new RestException(Status.BAD_REQUEST, e.getMessage()); |
| } |
| |
| FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); |
| if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) { |
| log.warn("Function in trigger function does not exist @ /{}/{}/{}", tenant, namespace, functionName); |
| throw new RestException(Status.NOT_FOUND, String.format("Function %s doesn't exist", functionName)); |
| } |
| |
| FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, |
| functionName); |
| |
| String inputTopicToWrite; |
| if (topic != null) { |
| inputTopicToWrite = topic; |
| } else if (functionMetaData.getFunctionDetails().getSource().getInputSpecsCount() == 1) { |
| inputTopicToWrite = functionMetaData.getFunctionDetails().getSource().getInputSpecsMap() |
| .keySet().iterator().next(); |
| } else { |
| log.error("Function in trigger function has more than 1 input topics @ /{}/{}/{}", tenant, namespace, |
| functionName); |
| throw new RestException(Status.BAD_REQUEST, "Function in trigger function has more than 1 input topics"); |
| } |
| if (functionMetaData.getFunctionDetails().getSource().getInputSpecsCount() == 0 |
| || !functionMetaData.getFunctionDetails().getSource().getInputSpecsMap() |
| .containsKey(inputTopicToWrite)) { |
| log.error("Function in trigger function has unidentified topic @ /{}/{}/{} {}", tenant, namespace, |
| functionName, inputTopicToWrite); |
| throw new RestException(Status.BAD_REQUEST, "Function in trigger function has unidentified topic"); |
| } |
| try { |
| worker().getBrokerAdmin().topics().getSubscriptions(inputTopicToWrite); |
| } catch (PulsarAdminException e) { |
| log.error("Function in trigger function is not ready @ /{}/{}/{}", tenant, namespace, functionName); |
| throw new RestException(Status.BAD_REQUEST, "Function in trigger function is not ready"); |
| } |
| String outputTopic = functionMetaData.getFunctionDetails().getSink().getTopic(); |
| Reader<byte[]> reader = null; |
| Producer<byte[]> producer = null; |
| try { |
| if (!isEmpty(outputTopic)) { |
| reader = worker().getClient().newReader() |
| .topic(outputTopic) |
| .startMessageId(MessageId.latest) |
| .readerName(worker().getWorkerConfig().getWorkerId() + "-trigger-" |
| + FunctionCommon.getFullyQualifiedName(tenant, namespace, functionName)) |
| .create(); |
| } |
| producer = worker().getClient().newProducer(Schema.AUTO_PRODUCE_BYTES()) |
| .topic(inputTopicToWrite) |
| .producerName(worker().getWorkerConfig().getWorkerId() + "-trigger-" |
| + FunctionCommon.getFullyQualifiedName(tenant, namespace, functionName)) |
| .create(); |
| byte[] targetArray; |
| if (uploadedInputStream != null) { |
| targetArray = new byte[uploadedInputStream.available()]; |
| uploadedInputStream.read(targetArray); |
| } else { |
| targetArray = input.getBytes(); |
| } |
| MessageId msgId = producer.send(targetArray); |
| if (reader == null) { |
| return null; |
| } |
| long curTime = System.currentTimeMillis(); |
| long maxTime = curTime + 1000; |
| while (curTime < maxTime) { |
| Message msg = reader.readNext(10000, TimeUnit.MILLISECONDS); |
| if (msg == null) { |
| break; |
| } |
| if (msg.getProperties().containsKey("__pfn_input_msg_id__") |
| && msg.getProperties().containsKey("__pfn_input_topic__")) { |
| MessageId newMsgId = MessageId.fromByteArray( |
| Base64.getDecoder().decode((String) msg.getProperties().get("__pfn_input_msg_id__"))); |
| |
| if (msgId.equals(newMsgId) |
| && msg.getProperties().get("__pfn_input_topic__") |
| .equals(TopicName.get(inputTopicToWrite).toString())) { |
| return new String(msg.getData()); |
| } |
| } |
| curTime = System.currentTimeMillis(); |
| } |
| throw new RestException(Status.REQUEST_TIMEOUT, "Request Timed Out"); |
| } catch (SchemaSerializationException e) { |
| throw new RestException(Status.BAD_REQUEST, String.format( |
| "Failed to serialize input with error: %s. Please check" |
| + "if input data conforms with the schema of the input topic.", |
| e.getMessage())); |
| } catch (IOException e) { |
| throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); |
| } finally { |
| if (reader != null) { |
| reader.closeAsync(); |
| } |
| if (producer != null) { |
| producer.closeAsync(); |
| } |
| } |
| } |
| |
| @Override |
| public FunctionState getFunctionState(final String tenant, |
| final String namespace, |
| final String functionName, |
| final String key, |
| final AuthenticationParameters authParams) { |
| |
| if (!isWorkerServiceAvailable()) { |
| throwUnavailableException(); |
| } |
| |
| throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, functionName, "get state for", |
| authParams); |
| |
| if (null == worker().getStateStoreProvider()) { |
| throwStateStoreUnvailableResponse(); |
| } |
| |
| // validate parameters |
| try { |
| validateFunctionStateParams(tenant, namespace, functionName, key); |
| } catch (IllegalArgumentException e) { |
| log.error("Invalid getFunctionState request @ /{}/{}/{}/{}", |
| tenant, namespace, functionName, key, e); |
| throw new RestException(Status.BAD_REQUEST, e.getMessage()); |
| } |
| |
| try { |
| DefaultStateStore store = worker().getStateStoreProvider().getStateStore(tenant, namespace, functionName); |
| ByteBuffer buf = store.get(key); |
| if (buf == null) { |
| throw new RestException(Status.NOT_FOUND, "key '" + key + "' doesn't exist."); |
| } |
| |
| // try to parse the state as a long |
| // but even if it can be parsed as a long, this number may not be the actual state, |
| // so we will always return a `stringValue` or `bytesValue` with the number value |
| Long number = null; |
| if (buf.remaining() == Long.BYTES) { |
| number = buf.getLong(); |
| } |
| |
| if (Utf8.isWellFormed(buf.array())) { |
| return new FunctionState(key, new String(buf.array(), UTF_8), null, number, null); |
| } else { |
| return new FunctionState(key, null, buf.array(), number, null); |
| } |
| } catch (RestException e) { |
| throw e; |
| } catch (Throwable e) { |
| log.error("Error while getFunctionState request @ /{}/{}/{}/{}", |
| tenant, namespace, functionName, key, e); |
| throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); |
| } |
| } |
| |
| @Override |
| public void putFunctionState(final String tenant, |
| final String namespace, |
| final String functionName, |
| final String key, |
| final FunctionState state, |
| final AuthenticationParameters authParams) { |
| |
| if (!isWorkerServiceAvailable()) { |
| throwUnavailableException(); |
| } |
| |
| if (null == worker().getStateStoreProvider()) { |
| throwStateStoreUnvailableResponse(); |
| } |
| |
| throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, functionName, "put state for", |
| authParams); |
| |
| if (!key.equals(state.getKey())) { |
| log.error("{}/{}/{} Bad putFunction Request, path key doesn't match key in json", tenant, namespace, |
| functionName); |
| throw new RestException(Status.BAD_REQUEST, "Path key doesn't match key in json"); |
| } |
| |
| // validate parameters |
| try { |
| validateFunctionStateParams(tenant, namespace, functionName, key); |
| } catch (IllegalArgumentException e) { |
| log.error("Invalid putFunctionState request @ /{}/{}/{}/{}", |
| tenant, namespace, functionName, key, e); |
| throw new RestException(Status.BAD_REQUEST, e.getMessage()); |
| } |
| |
| try { |
| DefaultStateStore store = worker().getStateStoreProvider().getStateStore(tenant, namespace, functionName); |
| ByteBuffer data; |
| if (StringUtils.isNotEmpty(state.getStringValue())) { |
| data = ByteBuffer.wrap(state.getStringValue().getBytes(UTF_8)); |
| } else if (state.getByteValue() != null) { |
| data = ByteBuffer.wrap(state.getByteValue()); |
| } else if (state.getNumberValue() != null) { |
| data = ByteBuffer.allocate(Long.BYTES); |
| data.putLong(state.getNumberValue()); |
| } else { |
| throw new IllegalArgumentException("Invalid state value"); |
| } |
| store.put(key, data); |
| } catch (Throwable e) { |
| log.error("Error while putFunctionState request @ /{}/{}/{}/{}", |
| tenant, namespace, functionName, key, e); |
| throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); |
| } |
| } |
| |
| @Override |
| public void uploadFunction(final InputStream uploadedInputStream, final String path, |
| AuthenticationParameters authParams) { |
| |
| if (!isWorkerServiceAvailable()) { |
| throwUnavailableException(); |
| } |
| |
| if (worker().getWorkerConfig().isAuthorizationEnabled() && !isSuperUser(authParams)) { |
| throw new RestException(Status.UNAUTHORIZED, "Client is not authorized to perform operation"); |
| } |
| |
| // validate parameters |
| try { |
| if (uploadedInputStream == null || path == null) { |
| throw new IllegalArgumentException("Function Package is not provided " + path); |
| } |
| } catch (IllegalArgumentException e) { |
| log.error("Invalid upload function request @ /{}", path, e); |
| throw new RestException(Status.BAD_REQUEST, e.getMessage()); |
| } |
| |
| // Upload to bookkeeper |
| try { |
| log.info("Uploading function package to {}", path); |
| if (worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) { |
| File tempFile = createPkgTempFile(); |
| tempFile.deleteOnExit(); |
| FileOutputStream out = new FileOutputStream(tempFile); |
| IOUtils.copy(uploadedInputStream, out); |
| PackageMetadata metadata = PackageMetadata.builder().createTime(System.currentTimeMillis()).build(); |
| worker().getBrokerAdmin().packages().upload(metadata, path, tempFile.getAbsolutePath()); |
| } else { |
| WorkerUtils.uploadToBookKeeper(worker().getDlogNamespace(), uploadedInputStream, path); |
| } |
| } catch (IOException | PulsarAdminException e) { |
| log.error("Error uploading file {}", path, e); |
| throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); |
| } |
| } |
| |
| @Override |
| public StreamingOutput downloadFunction(String tenant, String namespace, String componentName, |
| AuthenticationParameters authParams, boolean transformFunction) { |
| if (!isWorkerServiceAvailable()) { |
| throwUnavailableException(); |
| } |
| |
| throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, componentName, "download package for", |
| authParams); |
| |
| FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); |
| if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { |
| log.error("{} does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, |
| componentName); |
| throw new RestException(Status.NOT_FOUND, |
| String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName)); |
| } |
| |
| FunctionMetaData functionMetaData = |
| functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); |
| String pkgPath = transformFunction |
| ? functionMetaData.getTransformFunctionPackageLocation().getPackagePath() |
| : functionMetaData.getPackageLocation().getPackagePath(); |
| |
| FunctionDetails.ComponentType componentType = transformFunction |
| ? FunctionDetails.ComponentType.FUNCTION |
| : InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()); |
| |
| return getStreamingOutput(pkgPath, componentType); |
| } |
| |
| private StreamingOutput getStreamingOutput(String pkgPath) { |
| return getStreamingOutput(pkgPath, null); |
| } |
| |
| private StreamingOutput getStreamingOutput(String pkgPath, FunctionDetails.ComponentType componentType) { |
| return output -> { |
| if (pkgPath.startsWith(Utils.HTTP)) { |
| URL url = URI.create(pkgPath).toURL(); |
| try (InputStream inputStream = url.openStream()) { |
| IOUtils.copy(inputStream, output); |
| } |
| } else if (pkgPath.startsWith(Utils.FILE)) { |
| URI url = URI.create(pkgPath); |
| File file = new File(url.getPath()); |
| Files.copy(file.toPath(), output); |
| } else if (pkgPath.startsWith(Utils.BUILTIN) |
| && !worker().getWorkerConfig().getUploadBuiltinSinksSources()) { |
| Path narPath = getBuiltinArchivePath(pkgPath, componentType); |
| log.info("Loading {} from {}", pkgPath, narPath); |
| try (InputStream in = new FileInputStream(narPath.toString())) { |
| IOUtils.copy(in, output, 1024); |
| output.flush(); |
| } |
| } else if (worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) { |
| try { |
| File file = downloadPackageFile(worker(), pkgPath); |
| try (InputStream in = new FileInputStream(file)) { |
| IOUtils.copy(in, output, 1024); |
| output.flush(); |
| } |
| } catch (Exception e) { |
| log.error("Failed download package {} from packageManagement Service", pkgPath, e); |
| |
| } |
| } else { |
| WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), output, pkgPath); |
| } |
| }; |
| } |
| |
| private Path getBuiltinArchivePath(String pkgPath, FunctionDetails.ComponentType componentType) { |
| String type = pkgPath.replaceFirst("^builtin://", ""); |
| if (!FunctionDetails.ComponentType.FUNCTION.equals(componentType)) { |
| Connector connector = worker().getConnectorsManager().getConnector(type); |
| if (connector != null) { |
| return connector.getArchivePath(); |
| } |
| if (componentType != null) { |
| throw new IllegalStateException("Didn't find " + type + " in built-in connectors"); |
| } |
| } |
| FunctionArchive function = worker().getFunctionsManager().getFunction(type); |
| if (function != null) { |
| return function.getArchivePath(); |
| } |
| if (componentType != null) { |
| throw new IllegalStateException("Didn't find " + type + " in built-in functions"); |
| } |
| throw new IllegalStateException("Didn't find " + type + " in built-in connectors or functions"); |
| } |
| |
| @Override |
| public StreamingOutput downloadFunction(final String path, final AuthenticationParameters authParams) { |
| |
| if (!isWorkerServiceAvailable()) { |
| throwUnavailableException(); |
| } |
| |
| if (worker().getWorkerConfig().isAuthorizationEnabled()) { |
| // to maintain backwards compatibility but still have authorization |
| String[] tokens = path.split("/"); |
| if (tokens.length == 4) { |
| String tenant = tokens[0]; |
| String namespace = tokens[1]; |
| String componentName = tokens[2]; |
| |
| throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, componentName, "download package for", |
| authParams); |
| } else { |
| if (!isSuperUser(authParams)) { |
| throw new RestException(Status.UNAUTHORIZED, "Client is not authorized to perform operation"); |
| } |
| } |
| } |
| |
| return getStreamingOutput(path); |
| } |
| |
| private void validateListFunctionRequestParams(final String tenant, final String namespace) |
| throws IllegalArgumentException { |
| |
| if (tenant == null) { |
| throw new IllegalArgumentException("Tenant is not provided"); |
| } |
| if (namespace == null) { |
| throw new IllegalArgumentException("Namespace is not provided"); |
| } |
| } |
| |
| protected void validateGetFunctionInstanceRequestParams(final String tenant, |
| final String namespace, |
| final String componentName, |
| final FunctionDetails.ComponentType componentType, |
| final String instanceId) throws IllegalArgumentException { |
| validateGetFunctionRequestParams(tenant, namespace, componentName, componentType); |
| if (instanceId == null) { |
| throw new IllegalArgumentException(String.format("%s Instance Id is not provided", componentType)); |
| } |
| } |
| |
| protected void validateGetFunctionRequestParams(String tenant, String namespace, String subject, |
| FunctionDetails.ComponentType componentType) |
| throws IllegalArgumentException { |
| |
| if (tenant == null) { |
| throw new IllegalArgumentException("Tenant is not provided"); |
| } |
| if (namespace == null) { |
| throw new IllegalArgumentException("Namespace is not provided"); |
| } |
| if (subject == null) { |
| throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " name is not provided"); |
| } |
| } |
| |
| private void validateDeregisterRequestParams(String tenant, String namespace, String subject, |
| FunctionDetails.ComponentType componentType) |
| throws IllegalArgumentException { |
| |
| if (tenant == null) { |
| throw new IllegalArgumentException("Tenant is not provided"); |
| } |
| if (namespace == null) { |
| throw new IllegalArgumentException("Namespace is not provided"); |
| } |
| if (subject == null) { |
| throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " name is not provided"); |
| } |
| } |
| |
| private void validateFunctionStateParams(final String tenant, |
| final String namespace, |
| final String functionName, |
| final String key) |
| throws IllegalArgumentException { |
| |
| if (tenant == null) { |
| throw new IllegalArgumentException("Tenant is not provided"); |
| } |
| if (namespace == null) { |
| throw new IllegalArgumentException("Namespace is not provided"); |
| } |
| if (functionName == null) { |
| throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " name is not provided"); |
| } |
| if (key == null) { |
| throw new IllegalArgumentException("Key is not provided"); |
| } |
| } |
| |
| private String getFunctionCodeBuiltin(FunctionDetails functionDetails, |
| FunctionDetails.ComponentType componentType) { |
| if (componentType == FunctionDetails.ComponentType.SOURCE && functionDetails.hasSource()) { |
| SourceSpec sourceSpec = functionDetails.getSource(); |
| if (!isEmpty(sourceSpec.getBuiltin())) { |
| return sourceSpec.getBuiltin(); |
| } |
| } |
| |
| if (componentType == FunctionDetails.ComponentType.SINK && functionDetails.hasSink()) { |
| SinkSpec sinkSpec = functionDetails.getSink(); |
| if (!isEmpty(sinkSpec.getBuiltin())) { |
| return sinkSpec.getBuiltin(); |
| } |
| } |
| |
| if (componentType == FunctionDetails.ComponentType.FUNCTION) { |
| if (!isEmpty(functionDetails.getBuiltin())) { |
| return functionDetails.getBuiltin(); |
| } |
| } |
| |
| return null; |
| } |
| |
| private void validateTriggerRequestParams(final String tenant, |
| final String namespace, |
| final String functionName, |
| final String topic, |
| final String input, |
| final InputStream uploadedInputStream) { |
| // Note : Checking topic is not required it can be null |
| |
| if (tenant == null) { |
| throw new IllegalArgumentException("Tenant is not provided"); |
| } |
| if (namespace == null) { |
| throw new IllegalArgumentException("Namespace is not provided"); |
| } |
| if (functionName == null) { |
| throw new IllegalArgumentException("Function name is not provided"); |
| } |
| if (uploadedInputStream == null && input == null) { |
| throw new IllegalArgumentException("Trigger Data is not provided"); |
| } |
| } |
| |
| private void throwStateStoreUnvailableResponse() { |
| throw new RestException(Status.SERVICE_UNAVAILABLE, |
| "State storage client is not done initializing. " + "Please try again in a little while."); |
| } |
| |
| public static String createPackagePath(String tenant, String namespace, String functionName, String fileName) { |
| return String.format("%s/%s/%s/%s", tenant, namespace, Codec.encode(functionName), |
| getUniquePackageName(Codec.encode(fileName))); |
| } |
| |
| /** |
| * @deprecated use {@link #isAuthorizedRole(String, String, AuthenticationParameters)} instead. |
| */ |
| @Deprecated |
| public boolean isAuthorizedRole(String tenant, String namespace, String clientRole, |
| AuthenticationDataSource authenticationData) throws PulsarAdminException { |
| AuthenticationParameters authParams = AuthenticationParameters.builder().clientRole(clientRole) |
| .clientAuthenticationDataSource(authenticationData).build(); |
| return isAuthorizedRole(tenant, namespace, authParams); |
| } |
| |
| public boolean isAuthorizedRole(String tenant, String namespace, |
| AuthenticationParameters authParams) throws PulsarAdminException { |
| if (worker().getWorkerConfig().isAuthorizationEnabled()) { |
| return allowFunctionOps(NamespaceName.get(tenant, namespace), authParams); |
| } else { |
| return true; |
| } |
| } |
| |
| public void throwRestExceptionIfUnauthorizedForNamespace(String tenant, String namespace, String componentName, |
| String action, AuthenticationParameters authParams) { |
| try { |
| if (!isAuthorizedRole(tenant, namespace, authParams)) { |
| log.warn("{}/{}/{} Client with role [{}] and originalPrincipal [{}] is not authorized to {} {}", |
| tenant, namespace, componentName, authParams.getClientRole(), |
| authParams.getOriginalPrincipal(), action, ComponentTypeUtils.toString(componentType)); |
| throw new RestException(Status.UNAUTHORIZED, "Client is not authorized to perform operation"); |
| } |
| } catch (PulsarAdminException e) { |
| log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e); |
| throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); |
| } |
| } |
| |
| @Deprecated |
| protected void componentStatusRequestValidate(final String tenant, final String namespace, |
| final String componentName, |
| final String clientRole, |
| final AuthenticationDataSource clientAuthenticationDataHttps) { |
| AuthenticationParameters authParams = AuthenticationParameters.builder().clientRole(clientRole) |
| .clientAuthenticationDataSource(clientAuthenticationDataHttps).build(); |
| componentStatusRequestValidate(tenant, namespace, componentName, authParams); |
| } |
| |
| protected void componentStatusRequestValidate(final String tenant, final String namespace, |
| final String componentName, |
| final AuthenticationParameters authParams) { |
| if (!isWorkerServiceAvailable()) { |
| throw new RestException(Status.SERVICE_UNAVAILABLE, |
| "Function worker service is not done initializing. Please try again in a little while."); |
| } |
| |
| throwRestExceptionIfUnauthorizedForNamespace(tenant, namespace, componentName, "get status for", |
| authParams); |
| |
| // validate parameters |
| try { |
| validateGetFunctionRequestParams(tenant, namespace, componentName, componentType); |
| } catch (IllegalArgumentException e) { |
| log.error("Invalid get {} Status request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, |
| namespace, componentName, e); |
| throw new RestException(Status.BAD_REQUEST, e.getMessage()); |
| } |
| |
| FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); |
| if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) { |
| log.warn("{} in get {} Status does not exist @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), |
| componentType, tenant, namespace, componentName); |
| throw new RestException(Status.NOT_FOUND, |
| String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName)); |
| } |
| |
| FunctionMetaData functionMetaData = |
| functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); |
| if (!InstanceUtils.calculateSubjectType(functionMetaData.getFunctionDetails()).equals(componentType)) { |
| log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, |
| ComponentTypeUtils.toString(componentType)); |
| throw new RestException(Status.NOT_FOUND, |
| String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName)); |
| } |
| } |
| |
| @Deprecated |
| protected void componentInstanceStatusRequestValidate(final String tenant, |
| final String namespace, |
| final String componentName, |
| final int instanceId, |
| final String clientRole, |
| final AuthenticationDataSource clientAuthenticationDataHttps) { |
| AuthenticationParameters authParams = AuthenticationParameters.builder().clientRole(clientRole) |
| .clientAuthenticationDataSource(clientAuthenticationDataHttps).build(); |
| componentInstanceStatusRequestValidate(tenant, namespace, componentName, instanceId, authParams); |
| } |
| |
| protected void componentInstanceStatusRequestValidate(final String tenant, |
| final String namespace, |
| final String componentName, |
| final int instanceId, |
| final AuthenticationParameters authParams) { |
| |
| componentStatusRequestValidate(tenant, namespace, componentName, authParams); |
| |
| FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); |
| FunctionMetaData functionMetaData = |
| functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName); |
| int parallelism = functionMetaData.getFunctionDetails().getParallelism(); |
| if (instanceId < 0 || instanceId >= parallelism) { |
| log.error("instanceId in get {} Status out of bounds @ /{}/{}/{}", |
| ComponentTypeUtils.toString(componentType), tenant, namespace, componentName); |
| throw new RestException(Status.BAD_REQUEST, |
| String.format("%s %s doesn't have instance with id %s", ComponentTypeUtils.toString(componentType), |
| componentName, instanceId)); |
| } |
| } |
| |
| public boolean isSuperUser(AuthenticationParameters authParams) { |
| if (authParams.getClientRole() != null) { |
| try { |
| return worker().getAuthorizationService().isSuperUser(authParams) |
| .get(worker().getWorkerConfig().getMetadataStoreOperationTimeoutSeconds(), SECONDS); |
| } catch (InterruptedException e) { |
| log.warn("Time-out {} sec while checking the role {} originalPrincipal {} is a super user role ", |
| worker().getWorkerConfig().getMetadataStoreOperationTimeoutSeconds(), |
| authParams.getClientRole(), authParams.getOriginalPrincipal()); |
| throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); |
| } catch (Exception e) { |
| log.warn("Failed verifying role {} originalPrincipal {} is a super user role", |
| authParams.getClientRole(), authParams.getOriginalPrincipal(), e); |
| throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * @deprecated use {@link #isSuperUser(AuthenticationParameters)} |
| */ |
| @Deprecated |
| public boolean isSuperUser(String clientRole, AuthenticationDataSource authenticationData) { |
| AuthenticationParameters authParams = AuthenticationParameters.builder().clientRole(clientRole) |
| .clientAuthenticationDataSource(authenticationData).build(); |
| return isSuperUser(authParams); |
| } |
| |
| /** |
| * @deprecated use {@link #isSuperUser(AuthenticationParameters)} |
| */ |
| @Deprecated |
| public boolean allowFunctionOps(NamespaceName namespaceName, String role, |
| AuthenticationDataSource authenticationData) { |
| AuthenticationParameters authParams = AuthenticationParameters.builder().clientRole(role) |
| .clientAuthenticationDataSource(authenticationData).build(); |
| return allowFunctionOps(namespaceName, authParams); |
| } |
| |
| public boolean allowFunctionOps(NamespaceName namespaceName, AuthenticationParameters authParams) { |
| try { |
| switch (componentType) { |
| case SINK: |
| return worker().getAuthorizationService().allowSinkOpsAsync(namespaceName, authParams) |
| .get(worker().getWorkerConfig().getMetadataStoreOperationTimeoutSeconds(), SECONDS); |
| case SOURCE: |
| return worker().getAuthorizationService().allowSourceOpsAsync(namespaceName, authParams) |
| .get(worker().getWorkerConfig().getMetadataStoreOperationTimeoutSeconds(), SECONDS); |
| case FUNCTION: |
| default: |
| return worker().getAuthorizationService().allowFunctionOpsAsync(namespaceName, authParams) |
| .get(worker().getWorkerConfig().getMetadataStoreOperationTimeoutSeconds(), SECONDS); |
| } |
| } catch (InterruptedException e) { |
| log.warn("Time-out {} sec while checking function authorization on {} ", |
| worker().getWorkerConfig().getMetadataStoreOperationTimeoutSeconds(), namespaceName); |
| throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); |
| } catch (Exception e) { |
| log.warn("Admin-client with Role [{}] originalPrincipal [{}] failed to get function permissions for " |
| + "namespace - {}. {}", authParams.getClientRole(), |
| authParams.getOriginalPrincipal(), namespaceName, e.getMessage(), e); |
| throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); |
| } |
| } |
| |
| private void internalProcessFunctionRequest(final String tenant, final String namespace, final String functionName, |
| final FunctionMetaData functionMetadata, boolean delete, |
| String errorMsg) { |
| try { |
| if (worker().getLeaderService().isLeader()) { |
| worker().getFunctionMetaDataManager().updateFunctionOnLeader(functionMetadata, delete); |
| } else { |
| FunctionsImpl functions = (FunctionsImpl) worker().getFunctionAdmin().functions(); |
| functions.updateOnWorkerLeader(tenant, |
| namespace, functionName, functionMetadata.toByteArray(), delete); |
| } |
| } catch (PulsarAdminException e) { |
| log.error(errorMsg, e); |
| throw new RestException(e.getStatusCode(), e.getMessage()); |
| } catch (IllegalStateException e) { |
| throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage()); |
| } catch (IllegalArgumentException e) { |
| throw new RestException(Status.BAD_REQUEST, e.getMessage()); |
| } |
| } |
| |
| protected ClassLoader getClassLoaderFromPackage(String className, |
| File packageFile, |
| String narExtractionDirectory) { |
| return FunctionCommon.getClassLoaderFromPackage(componentType, className, packageFile, narExtractionDirectory); |
| } |
| |
| static File downloadPackageFile(PulsarWorkerService worker, String packageName) |
| throws IOException, PulsarAdminException { |
| Path tempDirectory; |
| if (worker.getWorkerConfig().getDownloadDirectory() != null) { |
| tempDirectory = Paths.get(worker.getWorkerConfig().getDownloadDirectory()); |
| } else { |
| // use the Nar extraction directory as a temporary directory for downloaded files |
| tempDirectory = Paths.get(worker.getWorkerConfig().getNarExtractionDirectory()); |
| } |
| Files.createDirectories(tempDirectory); |
| File file = Files.createTempFile(tempDirectory, "function", ".tmp").toFile(); |
| worker.getBrokerAdmin().packages().download(packageName, file.toString()); |
| return file; |
| } |
| |
| protected File getPackageFile(String functionPkgUrl, String existingPackagePath, InputStream uploadedInputStream) |
| throws IOException, PulsarAdminException { |
| File componentPackageFile = null; |
| if (isNotBlank(functionPkgUrl)) { |
| componentPackageFile = getPackageFile(functionPkgUrl); |
| } else if (existingPackagePath.startsWith(Utils.FILE) || existingPackagePath.startsWith(Utils.HTTP)) { |
| try { |
| componentPackageFile = FunctionCommon.extractFileFromPkgURL(existingPackagePath); |
| } catch (Exception e) { |
| throw new IllegalArgumentException(String.format("Encountered error \"%s\" " |
| + "when getting %s package from %s", e.getMessage(), |
| ComponentTypeUtils.toString(componentType), functionPkgUrl)); |
| } |
| } else if (uploadedInputStream != null) { |
| componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream); |
| } else if (!existingPackagePath.startsWith(Utils.BUILTIN)) { |
| componentPackageFile = FunctionCommon.createPkgTempFile(); |
| componentPackageFile.deleteOnExit(); |
| if (worker().getWorkerConfig().isFunctionsWorkerEnablePackageManagement()) { |
| worker().getBrokerAdmin().packages().download( |
| existingPackagePath, |
| componentPackageFile.getAbsolutePath()); |
| } else { |
| WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), |
| componentPackageFile, existingPackagePath); |
| } |
| } |
| return componentPackageFile; |
| } |
| |
| protected File downloadPackageFile(String packageName) throws IOException, PulsarAdminException { |
| return downloadPackageFile(worker(), packageName); |
| } |
| |
| protected File getPackageFile(String functionPkgUrl) throws IOException, PulsarAdminException { |
| if (Utils.hasPackageTypePrefix(functionPkgUrl)) { |
| return downloadPackageFile(functionPkgUrl); |
| } else { |
| if (!Utils.isFunctionPackageUrlSupported(functionPkgUrl)) { |
| throw new IllegalArgumentException("Function Package url is not valid." |
| + "supported url (http/https/file)"); |
| } |
| try { |
| return FunctionCommon.extractFileFromPkgURL(functionPkgUrl); |
| } catch (Exception e) { |
| throw new IllegalArgumentException(String.format("Encountered error \"%s\" " |
| + "when getting %s package from %s", e.getMessage(), |
| ComponentTypeUtils.toString(componentType), functionPkgUrl), e); |
| } |
| } |
| } |
| |
| protected ClassLoader getBuiltinFunctionClassLoader(String archive) { |
| if (!StringUtils.isEmpty(archive)) { |
| if (archive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) { |
| archive = archive.replaceFirst("^builtin://", ""); |
| |
| FunctionArchive function = worker().getFunctionsManager().getFunction(archive); |
| // check if builtin connector exists |
| if (function == null) { |
| throw new IllegalArgumentException("Built-in " + componentType + " is not available"); |
| } |
| return function.getClassLoader(); |
| } |
| } |
| return null; |
| } |
| } |