blob: 69f8b3814c3ebcd18bbe8ed85ebe58e87a5acc38 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.worker.rest.api;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionState;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.service.api.Functions;
import org.apache.pulsar.functions.worker.service.api.FunctionsV2;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@Slf4j
public class FunctionsImplV2 implements FunctionsV2<PulsarWorkerService> {
private final Functions<PulsarWorkerService> delegate;
public FunctionsImplV2(Supplier<PulsarWorkerService> workerServiceSupplier) {
this.delegate = new FunctionsImpl(workerServiceSupplier);
}
// For test purposes
public FunctionsImplV2(FunctionsImpl delegate) {
this.delegate = delegate;
}
@Override
public Response getFunctionInfo(final String tenant, final String namespace, final String functionName, String clientRole)
throws IOException {
// run just for parameter checks
delegate.getFunctionInfo(tenant, namespace, functionName, clientRole, null);
FunctionMetaDataManager functionMetaDataManager = delegate.worker().getFunctionMetaDataManager();
Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace,
functionName);
String functionDetailsJson = FunctionCommon.printJson(functionMetaData.getFunctionDetails());
return Response.status(Response.Status.OK).entity(functionDetailsJson).build();
}
@Override
public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String functionName,
final String instanceId, URI uri, String clientRole) throws IOException {
org.apache.pulsar.common.policies.data.FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatus = delegate.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri, clientRole, null);
String jsonResponse = FunctionCommon.printJson(toProto(functionInstanceStatus, instanceId));
return Response.status(Response.Status.OK).entity(jsonResponse).build();
}
@Override
public Response getFunctionStatusV2(String tenant, String namespace, String functionName, URI requestUri, String clientRole) throws
IOException {
FunctionStatus functionStatus = delegate.getFunctionStatus(tenant, namespace, functionName, requestUri, clientRole, null);
InstanceCommunication.FunctionStatusList.Builder functionStatusList = InstanceCommunication.FunctionStatusList.newBuilder();
functionStatus.instances.forEach(functionInstanceStatus -> functionStatusList.addFunctionStatusList(
toProto(functionInstanceStatus.getStatus(),
String.valueOf(functionInstanceStatus.getInstanceId()))));
String jsonResponse = FunctionCommon.printJson(functionStatusList);
return Response.status(Response.Status.OK).entity(jsonResponse).build();
}
@Override
public Response registerFunction(String tenant, String namespace, String functionName, InputStream
uploadedInputStream, FormDataContentDisposition fileDetail, String functionPkgUrl, String
functionDetailsJson, String clientRole) {
Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder();
try {
FunctionCommon.mergeJson(functionDetailsJson, functionDetailsBuilder);
} catch (IOException e) {
throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
}
FunctionConfig functionConfig = FunctionConfigUtils.convertFromDetails(functionDetailsBuilder.build());
delegate.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionConfig, clientRole, null);
return Response.ok().build();
}
@Override
public Response updateFunction(String tenant, String namespace, String functionName, InputStream uploadedInputStream,
FormDataContentDisposition fileDetail, String functionPkgUrl, String
functionDetailsJson, String clientRole) {
Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder();
try {
FunctionCommon.mergeJson(functionDetailsJson, functionDetailsBuilder);
} catch (IOException e) {
throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
}
FunctionConfig functionConfig = FunctionConfigUtils.convertFromDetails(functionDetailsBuilder.build());
delegate.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionConfig, clientRole, null, null);
return Response.ok().build();
}
@Override
public Response deregisterFunction(String tenant, String namespace, String functionName, String clientAppId) {
delegate.deregisterFunction(tenant, namespace, functionName, clientAppId, null);
return Response.ok().build();
}
@Override
public Response listFunctions(String tenant, String namespace, String clientRole) {
Collection<String> functionStateList = delegate.listFunctions( tenant, namespace, clientRole, null);
return Response.status(Response.Status.OK).entity(new Gson().toJson(functionStateList.toArray())).build();
}
@Override
public Response triggerFunction(String tenant, String namespace, String functionName, String triggerValue,
InputStream triggerStream, String topic, String clientRole) {
String result = delegate.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic, clientRole, null);
return Response.status(Response.Status.OK).entity(result).build();
}
@Override
public Response getFunctionState(String tenant, String namespace, String functionName, String key, String clientRole) {
FunctionState functionState = delegate.getFunctionState(
tenant, namespace, functionName, key, clientRole, null);
String value;
if (functionState.getNumberValue() != null) {
value = "value : " + functionState.getNumberValue() + ", version : " + functionState.getVersion();
} else {
value = "value : " + functionState.getStringValue() + ", version : " + functionState.getVersion();
}
return Response.status(Response.Status.OK)
.entity(value)
.build();
}
@Override
public Response restartFunctionInstance(String tenant, String namespace, String functionName, String instanceId, URI
uri, String clientRole) {
delegate.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri, clientRole, null);
return Response.ok().build();
}
@Override
public Response restartFunctionInstances(String tenant, String namespace, String functionName, String clientRole) {
delegate.restartFunctionInstances(tenant, namespace, functionName, clientRole, null);
return Response.ok().build();
}
@Override
public Response stopFunctionInstance(String tenant, String namespace, String functionName, String instanceId, URI
uri, String clientRole) {
delegate.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri, clientRole, null);
return Response.ok().build();
}
@Override
public Response stopFunctionInstances(String tenant, String namespace, String functionName, String clientRole) {
delegate.stopFunctionInstances(tenant, namespace, functionName, clientRole, null);
return Response.ok().build();
}
@Override
public Response uploadFunction(InputStream uploadedInputStream, String path, String clientRole) {
delegate.uploadFunction(uploadedInputStream, path, clientRole, null);
return Response.ok().build();
}
@Override
public Response downloadFunction(String path, String clientRole) {
return Response.status(Response.Status.OK).entity(delegate.downloadFunction(path, clientRole, null)).build();
}
@Override
public List<ConnectorDefinition> getListOfConnectors() {
return delegate.getListOfConnectors();
}
private InstanceCommunication.FunctionStatus toProto(
org.apache.pulsar.common.policies.data.FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
functionInstanceStatus, String instanceId) {
List<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSysExceptions
= functionInstanceStatus.getLatestSystemExceptions()
.stream()
.map(exceptionInformation -> InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
.setExceptionString(exceptionInformation.getExceptionString())
.setMsSinceEpoch(exceptionInformation.getTimestampMs())
.build())
.collect(Collectors.toList());
List<InstanceCommunication.FunctionStatus.ExceptionInformation> latestUserExceptions
= functionInstanceStatus.getLatestUserExceptions()
.stream()
.map(exceptionInformation -> InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
.setExceptionString(exceptionInformation.getExceptionString())
.setMsSinceEpoch(exceptionInformation.getTimestampMs())
.build())
.collect(Collectors.toList());
InstanceCommunication.FunctionStatus functionStatus = InstanceCommunication.FunctionStatus.newBuilder()
.setRunning(functionInstanceStatus.isRunning())
.setFailureException(functionInstanceStatus.getError())
.setNumRestarts(functionInstanceStatus.getNumRestarts())
.setNumSuccessfullyProcessed(functionInstanceStatus.getNumSuccessfullyProcessed())
.setNumUserExceptions(functionInstanceStatus.getNumUserExceptions())
.addAllLatestUserExceptions(latestUserExceptions)
.setNumSystemExceptions(functionInstanceStatus.getNumSystemExceptions())
.addAllLatestSystemExceptions(latestSysExceptions)
.setAverageLatency(functionInstanceStatus.getAverageLatency())
.setLastInvocationTime(functionInstanceStatus.getLastInvocationTime())
.setInstanceId(instanceId)
.setWorkerId(delegate.worker().getWorkerConfig().getWorkerId())
.build();
return functionStatus;
}
}