blob: 9c1575fc8ed4d0ddb621ed2faaa8fd5952a3eb40 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin.impl;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Example;
import io.swagger.annotations.ExampleProperty;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.StreamingOutput;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionState;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.service.api.Functions;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;
public class FunctionsBase extends AdminResource {
Functions<? extends WorkerService> functions() {
return pulsar().getWorkerService().getFunctions();
}
@POST
@ApiOperation(value = "Creates a new Pulsar Function in cluster mode")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 400, message = "Invalid request (The Pulsar Function already exists, etc.)"),
@ApiResponse(code = 408, message = "Request timeout"),
@ApiResponse(code = 200, message = "Pulsar Function successfully created")
})
@Path("/{tenant}/{namespace}/{functionName}")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public void registerFunction(
@ApiParam(value = "The tenant of a Pulsar Function")
final @PathParam("tenant") String tenant,
@ApiParam(value = "The namespace of a Pulsar Function")
final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of a Pulsar Function")
final @PathParam("functionName") String functionName,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
@ApiParam(
value = "A JSON value presenting configuration payload of a Pulsar Function."
+ " An example of the expected Pulsar Function can be found here.\n"
+ "- **autoAck**\n"
+ " Whether or not the framework acknowledges messages automatically.\n"
+ "- **runtime**\n"
+ " What is the runtime of the Pulsar Function. Possible Values: [JAVA, PYTHON, GO]\n"
+ "- **resources**\n"
+ " The size of the system resources allowed by the Pulsar Function runtime."
+ " The resources include: cpu, ram, disk.\n"
+ "- **className**\n"
+ " The class name of a Pulsar Function.\n"
+ "- **customSchemaInputs**\n"
+ " The map of input topics to Schema class names (specified as a JSON object).\n"
+ "- **customSerdeInputs**\n"
+ " The map of input topics to SerDe class names (specified as a JSON object).\n"
+ "- **deadLetterTopic**\n"
+ " Messages that are not processed successfully are sent to `deadLetterTopic`.\n"
+ "- **runtimeFlags**\n"
+ " Any flags that you want to pass to the runtime."
+ " Note that in thread mode, these flags have no impact.\n"
+ "- **fqfn**\n"
+ " The Fully Qualified Function Name (FQFN) for the Pulsar Function.\n"
+ "- **inputSpecs**\n"
+ " The map of input topics to its consumer configuration,"
+ " each configuration has schema of "
+ " {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\","
+ " \"isRegexPattern\": true, \"receiverQueueSize\": 5}\n"
+ "- **inputs**\n"
+ " The input topic or topics (multiple topics can be specified as"
+ " a comma-separated list) of a Pulsar Function.\n"
+ "- **jar**\n"
+ " Path to the JAR file for the Pulsar Function"
+ " (if the Pulsar Function is written in Java). "
+ " It also supports URL path [http/https/file (file protocol assumes that file "
+ " already exists on worker host)] from which worker can download the package.\n"
+ "- **py**\n"
+ " Path to the main Python file or Python wheel file for the"
+ " Pulsar Function (if the Pulsar Function is written in Python).\n"
+ "- **go**\n"
+ " Path to the main Go executable binary for the Pulsar Function"
+ " (if the Pulsar Function is written in Go).\n"
+ "- **logTopic**\n"
+ " The topic to which the logs of a Pulsar Function are produced.\n"
+ "- **maxMessageRetries**\n"
+ " How many times should we try to process a message before giving up.\n"
+ "- **output**\n"
+ " The output topic of a Pulsar Function"
+ " (If none is specified, no output is written).\n"
+ "- **outputSerdeClassName**\n"
+ " The SerDe class to be used for messages output by the Pulsar Function.\n"
+ "- **parallelism**\n"
+ " The parallelism factor of a Pulsar Function"
+ " (i.e. the number of a Pulsar Function instances to run).\n"
+ "- **processingGuarantees**\n"
+ " The processing guarantees (that is, delivery semantics)"
+ " applied to the Pulsar Function."
+ " Possible Values: [ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE]\n"
+ "- **retainOrdering**\n"
+ " Function consumes and processes messages in order.\n"
+ "- **outputSchemaType**\n"
+ " Represents either a builtin schema type (for example: 'avro', 'json', ect)"
+ " or the class name for a Schema implementation."
+ "- **subName**\n"
+ " Pulsar source subscription name. User can specify a subscription-name"
+ " for the input-topic consumer.\n"
+ "- **windowConfig**\n"
+ " The window configuration of a Pulsar Function.\n"
+ "- **timeoutMs**\n"
+ " The message timeout in milliseconds.\n"
+ "- **topicsPattern**\n"
+ " The topic pattern to consume from a list of topics under a namespace"
+ " that match the pattern."
+ " [input] and [topic-pattern] are mutually exclusive. Add SerDe class name for a "
+ " pattern in customSerdeInputs (supported for java fun only)\n"
+ "- **userConfig**\n"
+ " A map of user-defined configurations (specified as a JSON object).\n"
+ "- **secrets**\n"
+ " This is a map of secretName(that is how the secret is going to be accessed"
+ " in the Pulsar Function via context) to an object that"
+ " encapsulates how the secret is fetched by the underlying secrets provider."
+ " The type of an value here can be found by the"
+ " SecretProviderConfigurator.getSecretObjectType() method. \n"
+ "- **cleanupSubscription**\n"
+ " Whether the subscriptions of a Pulsar Function created or used should be deleted"
+ " when the Pulsar Function is deleted.\n",
examples = @Example(
value = @ExampleProperty(
mediaType = MediaType.APPLICATION_JSON,
value = "{\n"
+ " \"inputs\": persistent://public/default/input-topic,\n"
+ " \"parallelism\": 4\n"
+ " \"output\": persistent://public/default/output-topic\n"
+ " \"log-topic\": persistent://public/default/log-topic\n"
+ " \"classname\": org.example.test.ExclamationFunction\n"
+ " \"jar\": java-function-1.0-SNAPSHOT.jar\n"
+ "}\n"
)
)
)
final @FormDataParam("functionConfig") FunctionConfig functionConfig) {
functions().registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionConfig, clientAppId(), clientAuthData());
}
@PUT
@ApiOperation(value = "Updates a Pulsar Function currently running in cluster mode")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 400, message = "Invalid request (The Pulsar Function doesn't exist, etc.)"),
@ApiResponse(code = 200, message = "Pulsar Function successfully updated")
})
@Path("/{tenant}/{namespace}/{functionName}")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public void updateFunction(
@ApiParam(value = "The tenant of a Pulsar Function")
final @PathParam("tenant") String tenant,
@ApiParam(value = "The namespace of a Pulsar Function")
final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of a Pulsar Function")
final @PathParam("functionName") String functionName,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
@ApiParam(
value = "A JSON value presenting configuration payload of a Pulsar Function."
+ " An example of the expected Pulsar Function can be found here.\n"
+ "- **autoAck**\n"
+ " Whether or not the framework acknowledges messages automatically.\n"
+ "- **runtime**\n"
+ " What is the runtime of the Pulsar Function. Possible Values: [JAVA, PYTHON, GO]\n"
+ "- **resources**\n"
+ " The size of the system resources allowed by the Pulsar Function runtime."
+ " The resources include: cpu, ram, disk.\n"
+ "- **className**\n"
+ " The class name of a Pulsar Function.\n"
+ "- **customSchemaInputs**\n"
+ " The map of input topics to Schema class names (specified as a JSON object).\n"
+ "- **customSerdeInputs**\n"
+ " The map of input topics to SerDe class names (specified as a JSON object).\n"
+ "- **deadLetterTopic**\n"
+ " Messages that are not processed successfully are sent to `deadLetterTopic`.\n"
+ "- **runtimeFlags**\n"
+ " Any flags that you want to pass to the runtime."
+ " Note that in thread mode, these flags have no impact.\n"
+ "- **fqfn**\n"
+ " The Fully Qualified Function Name (FQFN) for the Pulsar Function.\n"
+ "- **inputSpecs**\n"
+ " The map of input topics to its consumer configuration,"
+ " each configuration has schema of "
+ " {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\","
+ " \"isRegexPattern\": true, \"receiverQueueSize\": 5}\n"
+ "- **inputs**\n"
+ " The input topic or topics (multiple topics can be specified as"
+ " a comma-separated list) of a Pulsar Function.\n"
+ "- **jar**\n"
+ " Path to the JAR file for the Pulsar Function"
+ " (if the Pulsar Function is written in Java). "
+ " It also supports URL path [http/https/file (file protocol assumes that file "
+ " already exists on worker host)] from which worker can download the package.\n"
+ "- **py**\n"
+ " Path to the main Python file or Python wheel file for the Pulsar Function"
+ " (if the Pulsar Function is written in Python).\n"
+ "- **go**\n"
+ " Path to the main Go executable binary for the Pulsar Function"
+ " (if the Pulsar Function is written in Go).\n"
+ "- **logTopic**\n"
+ " The topic to which the logs of a Pulsar Function are produced.\n"
+ "- **maxMessageRetries**\n"
+ " How many times should we try to process a message before giving up.\n"
+ "- **output**\n"
+ " The output topic of a Pulsar Function (If none is specified, no output is written).\n"
+ "- **outputSerdeClassName**\n"
+ " The SerDe class to be used for messages output by the Pulsar Function.\n"
+ "- **parallelism**\n"
+ " The parallelism factor of a Pulsar Function "
+ "(i.e. the number of a Pulsar Function instances to run).\n"
+ "- **processingGuarantees**\n"
+ " The processing guarantees (that is, delivery semantics)"
+ " applied to the Pulsar Function."
+ " Possible Values: [ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE]\n"
+ "- **retainOrdering**\n"
+ " Function consumes and processes messages in order.\n"
+ "- **outputSchemaType**\n"
+ " Represents either a builtin schema type (for example: 'avro', 'json', ect)"
+ " or the class name for a Schema implementation."
+ "- **subName**\n"
+ " Pulsar source subscription name. User can specify"
+ " a subscription-name for the input-topic consumer.\n"
+ "- **windowConfig**\n"
+ " The window configuration of a Pulsar Function.\n"
+ "- **timeoutMs**\n"
+ " The message timeout in milliseconds.\n"
+ "- **topicsPattern**\n"
+ " The topic pattern to consume from a list of topics"
+ " under a namespace that match the pattern."
+ " [input] and [topic-pattern] are mutually exclusive. Add SerDe class name for a "
+ " pattern in customSerdeInputs (supported for java fun only)\n"
+ "- **userConfig**\n"
+ " A map of user-defined configurations (specified as a JSON object).\n"
+ "- **secrets**\n"
+ " This is a map of secretName(that is how the secret is going to be accessed"
+ " in the Pulsar Function via context) to an object that"
+ " encapsulates how the secret is fetched by the underlying secrets provider."
+ " The type of an value here can be found by the"
+ " SecretProviderConfigurator.getSecretObjectType() method. \n"
+ "- **cleanupSubscription**\n"
+ " Whether the subscriptions of a Pulsar Function created or used"
+ " should be deleted when the Pulsar Function is deleted.\n",
examples = @Example(
value = @ExampleProperty(
mediaType = MediaType.APPLICATION_JSON,
value = "{\n"
+ " \"inputs\": persistent://public/default/input-topic,\n"
+ " \"parallelism\": 4\n"
+ " \"output\": persistent://public/default/output-topic\n"
+ " \"log-topic\": persistent://public/default/log-topic\n"
+ " \"classname\": org.example.test.ExclamationFunction\n"
+ " \"jar\": java-function-1.0-SNAPSHOT.jar\n"
+ "}\n"
)
)
)
final @FormDataParam("functionConfig") FunctionConfig functionConfig,
@ApiParam(value = "The update options is for the Pulsar Function that needs to be updated.")
final @FormDataParam("updateOptions") UpdateOptions updateOptions) throws IOException {
functions().updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionConfig, clientAppId(), clientAuthData(), updateOptions);
}
@DELETE
@ApiOperation(value = "Deletes a Pulsar Function currently running in cluster mode")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The Pulsar Function doesn't exist"),
@ApiResponse(code = 408, message = "Request timeout"),
@ApiResponse(code = 200, message = "The Pulsar Function was successfully deleted")
})
@Path("/{tenant}/{namespace}/{functionName}")
public void deregisterFunction(
@ApiParam(value = "The tenant of a Pulsar Function")
final @PathParam("tenant") String tenant,
@ApiParam(value = "The namespace of a Pulsar Function")
final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of a Pulsar Function")
final @PathParam("functionName") String functionName) {
functions().deregisterFunction(tenant, namespace, functionName, clientAppId(), clientAuthData());
}
@GET
@ApiOperation(
value = "Fetches information about a Pulsar Function currently running in cluster mode",
response = FunctionConfig.class
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 408, message = "Request timeout"),
@ApiResponse(code = 404, message = "The Pulsar Function doesn't exist")
})
@Path("/{tenant}/{namespace}/{functionName}")
public FunctionConfig getFunctionInfo(
@ApiParam(value = "The tenant of a Pulsar Function")
final @PathParam("tenant") String tenant,
@ApiParam(value = "The namespace of a Pulsar Function")
final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of a Pulsar Function")
final @PathParam("functionName") String functionName) throws IOException {
return functions().getFunctionInfo(tenant, namespace, functionName, clientAppId(), clientAuthData());
}
@GET
@ApiOperation(
value = "Displays the status of a Pulsar Function instance",
response = FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData.class
)
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this function"),
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 404, message = "The Pulsar Function doesn't exist")
})
@Produces(MediaType.APPLICATION_JSON)
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/status")
public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData getFunctionInstanceStatus(
@ApiParam(value = "The tenant of a Pulsar Function") final @PathParam("tenant") String tenant,
@ApiParam(value = "The namespace of a Pulsar Function") final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of a Pulsar Function") final @PathParam("functionName") String functionName,
@ApiParam(value = "The instanceId of a Pulsar Function (if instance-id is not provided,"
+ " the stats of all instances is returned") final @PathParam("instanceId")
String instanceId) throws IOException {
return functions().getFunctionInstanceStatus(tenant, namespace, functionName,
instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@GET
@ApiOperation(
value = "Displays the status of a Pulsar Function",
response = FunctionStatus.class
)
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this function"),
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 404, message = "The Pulsar Function doesn't exist")
})
@Produces(MediaType.APPLICATION_JSON)
@Path("/{tenant}/{namespace}/{functionName}/status")
public FunctionStatus getFunctionStatus(
@ApiParam(value = "The tenant of a Pulsar Function")
final @PathParam("tenant") String tenant,
@ApiParam(value = "The namespace of a Pulsar Function")
final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of a Pulsar Function")
final @PathParam("functionName") String functionName) throws IOException {
return functions().getFunctionStatus(tenant, namespace, functionName, uri.getRequestUri(),
clientAppId(), clientAuthData());
}
@GET
@ApiOperation(
value = "Displays the stats of a Pulsar Function",
response = FunctionStats.class
)
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this function"),
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 404, message = "The Pulsar Function doesn't exist")
})
@Produces(MediaType.APPLICATION_JSON)
@Path("/{tenant}/{namespace}/{functionName}/stats")
public FunctionStats getFunctionStats(
@ApiParam(value = "The tenant of a Pulsar Function")
final @PathParam("tenant") String tenant,
@ApiParam(value = "The namespace of a Pulsar Function")
final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of a Pulsar Function")
final @PathParam("functionName") String functionName) throws IOException {
return functions().getFunctionStats(tenant, namespace, functionName,
uri.getRequestUri(), clientAppId(), clientAuthData());
}
@GET
@ApiOperation(
value = "Displays the stats of a Pulsar Function instance",
response = FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData.class
)
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this function"),
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 404, message = "The Pulsar Function doesn't exist")
})
@Produces(MediaType.APPLICATION_JSON)
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionInstanceStats(
@ApiParam(value = "The tenant of a Pulsar Function") final @PathParam("tenant") String tenant,
@ApiParam(value = "The namespace of a Pulsar Function") final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of a Pulsar Function") final @PathParam("functionName") String functionName,
@ApiParam(value = "The instanceId of a Pulsar Function"
+ " (if instance-id is not provided, the stats of all instances is returned") final @PathParam(
"instanceId") String instanceId) throws IOException {
return functions().getFunctionsInstanceStats(tenant, namespace, functionName, instanceId,
uri.getRequestUri(), clientAppId(), clientAuthData());
}
@GET
@ApiOperation(
value = "Lists all Pulsar Functions currently deployed in a given namespace",
response = String.class,
responseContainer = "Collection"
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
})
@Path("/{tenant}/{namespace}")
public List<String> listFunctions(
@ApiParam(value = "The tenant of a Pulsar Function")
final @PathParam("tenant") String tenant,
@ApiParam(value = "The namespace of a Pulsar Function")
final @PathParam("namespace") String namespace) {
return functions().listFunctions(tenant, namespace, clientAppId(), clientAuthData());
}
@POST
@ApiOperation(
value = "Triggers a Pulsar Function with a user-specified value or file data",
response = Message.class
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The Pulsar Function does not exist"),
@ApiResponse(code = 408, message = "Request timeout"),
@ApiResponse(code = 500, message = "Internal server error")
})
@Path("/{tenant}/{namespace}/{functionName}/trigger")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public String triggerFunction(
@ApiParam(value = "The tenant of a Pulsar Function") final @PathParam("tenant") String tenant,
@ApiParam(value = "The namespace of a Pulsar Function") final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of a Pulsar Function") final @PathParam("functionName") String functionName,
@ApiParam(value = "The value with which you want to trigger the Pulsar Function") final @FormDataParam(
"data") String triggerValue,
@ApiParam(value = "The path to the file that contains the data with"
+ " which you'd like to trigger the Pulsar Function") final @FormDataParam("dataStream")
InputStream triggerStream,
@ApiParam(value = "The specific topic name that the Pulsar Function"
+ " consumes from which you want to inject the data to") final @FormDataParam("topic")
String topic) {
return functions().triggerFunction(tenant, namespace, functionName, triggerValue,
triggerStream, topic, clientAppId(), clientAuthData());
}
@GET
@ApiOperation(
value = "Fetch the current state associated with a Pulsar Function",
response = FunctionState.class
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 404, message = "The key does not exist"),
@ApiResponse(code = 500, message = "Internal server error")
})
@Path("/{tenant}/{namespace}/{functionName}/state/{key}")
public FunctionState getFunctionState(
@ApiParam(value = "The tenant of a Pulsar Function")
final @PathParam("tenant") String tenant,
@ApiParam(value = "The namespace of a Pulsar Function")
final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of a Pulsar Function")
final @PathParam("functionName") String functionName,
@ApiParam(value = "The stats key")
final @PathParam("key") String key) {
return functions().getFunctionState(tenant, namespace, functionName, key, clientAppId(), clientAuthData());
}
@POST
@ApiOperation(
value = "Put the state associated with a Pulsar Function"
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 404, message = "The Pulsar Function does not exist"),
@ApiResponse(code = 500, message = "Internal server error")
})
@Path("/{tenant}/{namespace}/{functionName}/state/{key}")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public void putFunctionState(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @PathParam("key") String key,
final @FormDataParam("state") FunctionState stateJson) {
functions().putFunctionState(tenant, namespace, functionName, key, stateJson, clientAppId(), clientAuthData());
}
@POST
@ApiOperation(value = "Restart an instance of a Pulsar Function", response = Void.class)
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this function"),
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The Pulsar Function does not exist"),
@ApiResponse(code = 500, message = "Internal server error")
})
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/restart")
@Consumes(MediaType.APPLICATION_JSON)
public void restartFunction(
@ApiParam(value = "The tenant of a Pulsar Function") final @PathParam("tenant") String tenant,
@ApiParam(value = "The namespace of a Pulsar Function") final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of a Pulsar Function") final @PathParam("functionName") String functionName,
@ApiParam(value =
"The instanceId of a Pulsar Function (if instance-id is not provided, all instances are restarted")
final @PathParam("instanceId") String instanceId) {
functions().restartFunctionInstance(tenant, namespace, functionName, instanceId,
uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@ApiOperation(value = "Restart all instances of a Pulsar Function", response = Void.class)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The Pulsar Function does not exist"),
@ApiResponse(code = 500, message = "Internal server error")
})
@Path("/{tenant}/{namespace}/{functionName}/restart")
@Consumes(MediaType.APPLICATION_JSON)
public void restartFunction(
@ApiParam(value = "The tenant of a Pulsar Function")
final @PathParam("tenant") String tenant,
@ApiParam(value = "The namespace of a Pulsar Function")
final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of a Pulsar Function")
final @PathParam("functionName") String functionName) {
functions().restartFunctionInstances(tenant, namespace, functionName, clientAppId(), clientAuthData());
}
@POST
@ApiOperation(value = "Stop an instance of a Pulsar Function", response = Void.class)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The Pulsar Function does not exist"),
@ApiResponse(code = 500, message = "Internal server error")
})
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stop")
@Consumes(MediaType.APPLICATION_JSON)
public void stopFunction(
@ApiParam(value = "The tenant of a Pulsar Function") final @PathParam("tenant") String tenant,
@ApiParam(value = "The namespace of a Pulsar Function") final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of a Pulsar Function") final @PathParam("functionName") String functionName,
@ApiParam(value =
"The instanceId of a Pulsar Function (if instance-id is not provided, all instances are stopped. ")
final @PathParam("instanceId") String instanceId) {
functions().stopFunctionInstance(tenant, namespace, functionName, instanceId,
uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@ApiOperation(value = "Stop all instances of a Pulsar Function", response = Void.class)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The Pulsar Function does not exist"),
@ApiResponse(code = 500, message = "Internal server error")
})
@Path("/{tenant}/{namespace}/{functionName}/stop")
@Consumes(MediaType.APPLICATION_JSON)
public void stopFunction(
@ApiParam(value = "The tenant of a Pulsar Function")
final @PathParam("tenant") String tenant,
@ApiParam(value = "The namespace of a Pulsar Function")
final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of a Pulsar Function")
final @PathParam("functionName") String functionName) {
functions().stopFunctionInstances(tenant, namespace, functionName, clientAppId(), clientAuthData());
}
@POST
@ApiOperation(value = "Start an instance of a Pulsar Function", response = Void.class)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The Pulsar Function does not exist"),
@ApiResponse(code = 500, message = "Internal server error")
})
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/start")
@Consumes(MediaType.APPLICATION_JSON)
public void startFunction(
@ApiParam(value = "The tenant of a Pulsar Function") final @PathParam("tenant") String tenant,
@ApiParam(value = "The namespace of a Pulsar Function") final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of a Pulsar Function") final @PathParam("functionName") String functionName,
@ApiParam(value = "The instanceId of a Pulsar Function"
+ " (if instance-id is not provided, all instances sre started. ") final @PathParam("instanceId")
String instanceId) {
functions().startFunctionInstance(tenant, namespace, functionName, instanceId,
uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@ApiOperation(value = "Start all instances of a Pulsar Function", response = Void.class)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The Pulsar Function does not exist"),
@ApiResponse(code = 500, message = "Internal server error")
})
@Path("/{tenant}/{namespace}/{functionName}/start")
@Consumes(MediaType.APPLICATION_JSON)
public void startFunction(
@ApiParam(value = "The tenant of a Pulsar Function")
final @PathParam("tenant") String tenant,
@ApiParam(value = "The namespace of a Pulsar Function")
final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of a Pulsar Function")
final @PathParam("functionName") String functionName) {
functions().startFunctionInstances(tenant, namespace, functionName, clientAppId(), clientAuthData());
}
@POST
@ApiOperation(
value = "Uploads Pulsar Function file data (Admin only)",
hidden = true
)
@Path("/upload")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public void uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("path") String path) {
functions().uploadFunction(uploadedInputStream, path, clientAppId());
}
@GET
@ApiOperation(
value = "Downloads Pulsar Function file data (Admin only)",
hidden = true
)
@Path("/download")
public StreamingOutput downloadFunction(final @QueryParam("path") String path) {
return functions().downloadFunction(path, clientAppId(), clientAuthData());
}
@GET
@ApiOperation(
value = "Downloads Pulsar Function file data",
hidden = true
)
@Path("/{tenant}/{namespace}/{functionName}/download")
public StreamingOutput downloadFunction(
@ApiParam(value = "The tenant of a Pulsar Function")
final @PathParam("tenant") String tenant,
@ApiParam(value = "The namespace of a Pulsar Function")
final @PathParam("namespace") String namespace,
@ApiParam(value = "The name of a Pulsar Function")
final @PathParam("functionName") String functionName) {
return functions().downloadFunction(tenant, namespace, functionName, clientAppId(), clientAuthData());
}
@GET
@ApiOperation(
value = "Fetches a list of supported Pulsar IO connectors currently running in cluster mode",
response = List.class
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 408, message = "Request timeout")
})
@Path("/connectors")
@Deprecated
/**
* Deprecated in favor of moving endpoint to {@link org.apache.pulsar.broker.admin.v2.Worker}
*/
public List<ConnectorDefinition> getConnectorsList() throws IOException {
return functions().getListOfConnectors();
}
@PUT
@ApiOperation(value = "Updates a Pulsar Function on the worker leader", hidden = true)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have super-user permissions"),
@ApiResponse(code = 404, message = "The function does not exist"),
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 307, message = "Redirecting to the worker leader"),
@ApiResponse(code = 200, message = "Pulsar Function successfully updated")
})
@Path("/leader/{tenant}/{namespace}/{functionName}")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public void updateFunctionOnWorkerLeader(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @FormDataParam("functionMetaData")
InputStream uploadedInputStream,
final @FormDataParam("delete") boolean delete) {
functions().updateFunctionOnWorkerLeader(tenant, namespace, functionName, uploadedInputStream,
delete, uri.getRequestUri(), clientAppId());
}
}