Title: Support terminating Function & Connector with the fatal exception

Background knowledge

The Pulsar Function is a serverless computing framework that runs on top of Pulsar and processes messages.

The Pulsar IO Connector is a framework that allows users to easily integrate Pulsar with external systems, such as databases, messaging systems, and data pipelines. With Pulsar IO Connector, you can create, deploy, and manage connectors that read data from or write data to Pulsar topics. There are two types of Pulsar IO Connectors: source and sink. A source connector imports data from another system to Pulsar, while a sink connector exports data from Pulsar to another system. The Pulsar IO Connector is implemented based on the Pulsar Function framework. So in the following, we treat the connector as a special kind of function. The function refers to both function and connector.

Function Instance is a running instance of a Pulsar IO Connector that interacts with a specific external system or a Pulsar Function that processes messages from the topic.

Function Framework is a framework for running the Function instance.

Function Context is an interface that provides access to various information and resources for the connector or the function. The function context is passed to the connector or the function when it is initialized, and then can be used to interact with the Pulsar system.

The current implementation of the exception handler

Function instance thread: The function framework initializes a thread for each function instance to handle the core logic of the function/connector, including consuming messages from the Pulsar topic for the sink connector, executing the logic of the function, producing messages to the Pulsar topic for the source connector, handling the exception, etc. And let's define the Connector thread/Function thread as a thread that is created by the connector or function itself.

Exception handling logic: The function itself can throw exceptions, and this thread will catch the exception and then close the function. This means that the function will stop working until it is restarted manually or automatically by the function framework.

Even though it is not explicitly defined, there are two types of exceptions that should be handled by the function or the framework:

  • Fatal exception: This is an exception that the function cannot recover from by itself and needs to notify the framework to terminate it. These are fatal exceptions that indicate a configuration issue, a logic error, or an incompatible system. The function framework will catch these exceptions, report them to users, and terminate the function.
  • Non-fatal exception is an exception that the function instance don‘t need to be terminated for. It could be handled by the connector or function itself. Or be thrown by the function. This exception won’t cause the function instance to be terminated.

How to handle exceptions thrown from connectors

All the exceptions thrown form the connector are treated as fatal exceptions.

If the exception is thrown from the function instance thread, the function framework will catch the exception and terminate the function instance.

If the exception is thrown from the connector thread that is created by the connector itself, the function framework will not be able to catch the exception and terminate the function instance. The connector will hang forever. The Motivation part will talk more about this case.

If the exception is thrown from the external system, the connector implementation could treat it as a retryable exception and retry to process the message later, or throw it to indicate it as a fatal exception.

How to handle exceptions thrown from functions

All the exceptions thrown from the pulsar function are treated as non-fatal exceptions. The function framework will catch the exception and log it. But it will not terminate the function instance.

There is no way for the function developer to throw a fatal exception to the function framework to terminate the function instance.

Motivation

Currently, the connector and function cannot terminate the function instance if there are fatal exceptions thrown outside the function instance thread. The current implementation of the connector and Pulsar Function exception handler cannot handle the fatal exceptions that are thrown outside the function instance thread.

For example, suppose we have a sink connector that uses its own threads to batch-sink the data to an external system. If any fatal exceptions occur in those threads, the function instance thread will not be aware of them and will not be able to terminate the connector. This will cause the connector to hang indefinitely. There is a related issue here: https://github.com/apache/pulsar/issues/9464

The same problem exists for the source connector. The source connector may also use a separate thread to fetch data from an external system. If any fatal exceptions happen in that thread, the connector will also hang forever. This issue has been observed for the Kafka source connector: https://github.com/apache/pulsar/issues/9464. We have fixed it by adding the notifyError method to the PushSource class in PIP-281: https://github.com/apache/pulsar/pull/20807. However, this does not solve the same problem that all source connectors face because not all connectors are implemented based on the PushSource class.

The problem is same for the Pulsar Function. Currently, the function can't throw fatal exceptions to the function framework. We need to provide a way for the function developer to implement it.

We need a way for the connector and function developers to throw fatal exceptions outside the function instance thread. The function framework should catch these exceptions and terminate the function accordingly.

Goals

In Scope

  • Support terminating the function instance with fatal exceptions
  • This proposal will apply both to the Pulsar Function and the Pulsar Connector.

Out of Scope

  • The fixes of the exception-raising issue mentioned in the Motivation part for all the connectors are not included in this PIP. This PIP only provides the feature for the connector developer to terminate the function instance. The fixes should be in several different PRs.

High Level Design

Introduce a new method fatal to the context. All the connector implementation code and the function code can use this context and call the fatal method to terminate the instance while raising a fatal exception.

After the connector or function raises the fatal exception, the function instance thread will be interrupted. The function framework then could catch the exception, log it, and then terminate the function instance.

Detailed Design

Design & Implementation Details

This PIP proposes to add a new methodfatalto the context BaseContext. This method allows the connector or the function code to report a fatal exception to the function framework and terminate the instance. The SinkContext and SourceContext are all inherited from BaseContext. Therefore, all the sink connectors and source connectors can invoke this new method. The pulsar function context class Context is also inherited from BaseContext. Therefore, the function code can also invoke this new method.

In the fatal method, the function instance thread will be interrupted. The function instance thread can then catch the interrupt exception and get the fatal exception. The function framework then logs this exception, reports to the metrics, and finally terminates the function instance.

Tbe behavior when invoking the fatal method:

  • For the connector thread or function thread:
    • Invoke the fatal method
    • Send the exception to the function framework. There is a field deathException in the class JavaInstanceRunnable that is used to store the fatal exception.
    • Interrupt the function instance thread
  • For the function instance thread:
    • Catch the interrupt exception
    • Get the exception from the function framework
    • Report the log and metrics
    • Close the function instance

Public-facing Changes

Public API

Introduce fatal method to the BaseContext:

public interface BaseContext {
    /**
     * Terminate the function instance with a fatal exception.
     *
     * @param t the fatal exception to be raised
     */
    void fatal(Throwable t);
}

Binary protocol

No changes for this part.

Configuration

No changes for this part.

CLI

No changes for this part.

Metrics

No changes for this part.

Monitoring

No changes for this part.

Security Considerations

No security-related changes. The new method fatal will only take effect on the current function instance. It won't affect other function instances even they are in the same function worker.

Backward & Forward Compatibility

Revert

No operation required.

Upgrade

No operation required.

Alternatives

Using futures to handle results or exceptions returned the connector

The benefit of this solution is that it makes the use of exception throwing more intuitive to the connector developer.

But it requires changes to existing interfaces, including Source and Sink, which can complicate connector development. And we still need the fatal method to handle some cases such as terminating the instance in code outside of the message processing logic. This alternative solution can't handle this case.

Meanwhile, the implementation of this solution will also be more complex, involving changes to the core message processing logic of the function framework. We need to turn the entire message processing logic into an asynchronous pattern.

General Notes

Links