| # PIP-297: Support terminating Function & Connector with the fatal exception |
| |
| # Background knowledge |
| |
| The **Pulsar Function** is a serverless computing framework that runs on top of Pulsar and processes messages. |
| |
| The **Pulsar IO Connector** is a framework that allows users to easily integrate Pulsar with external systems, such as |
| databases, messaging systems, and data pipelines. With Pulsar IO Connector, you can create, deploy, and manage |
| connectors that read data from or write data to Pulsar topics. There are two types of Pulsar IO Connectors: source and |
| sink. A **source connector** imports data from another system to Pulsar, while a **sink connector** exports data from |
| Pulsar to another system. The Pulsar IO Connector is implemented based on the Pulsar Function framework. So in |
| the following, we treat the connector as a special kind of function. The `function` refers to both function and |
| connector. |
| |
| **Function Instance** is a running instance of a Pulsar IO Connector that interacts with a specific external system or a |
| Pulsar Function that processes messages from the topic. |
| |
| **Function Framework** is a framework for running the Function instance. |
| |
| **Function Context** is an interface that provides access to various information and resources for the connector or the |
| function. The function context is passed to the connector or the function when it is initialized, and then can be used |
| to interact with the Pulsar system. |
| |
| ## The current implementation of the exception handler |
| |
| **Function instance thread**: The function framework initializes a thread for each function instance to handle the |
| core logic of the function/connector, including consuming messages from the Pulsar topic for the sink connector, |
| executing the logic of the function, producing messages to the Pulsar topic for the source connector, handling the |
| exception, etc. And let's define the **Connector thread/Function thread** as a thread that is created by the connector |
| or function itself. |
| |
| **Exception handling logic**: The function itself can throw exceptions, and this thread will catch the exception and |
| then close the function. This means that the function will stop working until it is restarted manually or |
| automatically by the function framework. |
| |
| Even though it is not explicitly defined, there are two types of exceptions that should be handled by the function or |
| the framework: |
| |
| - **Fatal exception**: This is an exception that the function cannot recover from by itself and needs to notify the |
| framework to terminate it. These are fatal exceptions that indicate a configuration issue, a logic error, or an |
| incompatible system. The function framework will catch these exceptions, report them to users, and terminate the |
| function. |
| - **Non-fatal exception** is an exception that the function instance don't need to be terminated for. It could be |
| handled by the connector or function itself. Or be thrown by the function. This exception won't cause the function |
| instance to be terminated. |
| |
| ### How to handle exceptions thrown from connectors |
| |
| All the exceptions thrown form the connector are treated as fatal exceptions. |
| |
| If the exception is thrown from the function instance thread, the function framework will catch the exception and |
| terminate the function instance. |
| |
| If the exception is thrown from the connector thread that is created by the connector itself, the function framework |
| will not be able to catch the exception and terminate the function instance. The connector will hang forever. |
| The `Motivation` part will talk more about this case. |
| |
| If the exception is thrown from the external system, the connector implementation could treat it as a retryable |
| exception and retry to process the message later, or throw it to indicate it as a fatal exception. |
| |
| ### How to handle exceptions thrown from functions |
| |
| All the exceptions thrown from the pulsar function are treated as non-fatal exceptions. The function framework will |
| catch the exception and log it. But it will not terminate the function instance. |
| |
| There is no way for the function developer to throw a fatal exception to the function framework to terminate the |
| function instance. |
| |
| # Motivation |
| |
| Currently, the connector and function cannot terminate the function instance if there are fatal exceptions thrown |
| outside the function instance thread. The current implementation of the connector and Pulsar Function exception handler |
| cannot handle the fatal exceptions that are thrown outside the function instance thread. |
| |
| For example, suppose we have a sink connector that uses its own threads to batch-sink the data to an external system. If |
| any fatal exceptions occur in those threads, the function instance thread will not be aware of them and will |
| not be able to terminate the connector. This will cause the connector to hang indefinitely. There is a related issue |
| here: https://github.com/apache/pulsar/issues/9464 |
| |
| The same problem exists for the source connector. The source connector may also use a separate thread to fetch data from |
| an external system. If any fatal exceptions happen in that thread, the connector will also hang forever. This issue has |
| been observed for the Kafka source connector: https://github.com/apache/pulsar/issues/9464. We have fixed it by adding |
| the notifyError method to the `PushSource` class in PIP-281: https://github.com/apache/pulsar/pull/20807. However, this |
| does not solve the same problem that all source connectors face because not all connectors are implemented based on |
| the `PushSource` class. |
| |
| The problem is same for the Pulsar Function. Currently, the function can't throw fatal exceptions to the function |
| framework. We need to provide a way for the function developer to implement it. |
| |
| We need a way for the connector and function developers to throw fatal exceptions outside the function instance |
| thread. The function framework should catch these exceptions and terminate the function accordingly. |
| |
| # Goals |
| |
| ## In Scope |
| |
| - Support terminating the function instance with fatal exceptions |
| - This proposal will apply both to the Pulsar Function and the Pulsar Connector. |
| |
| ## Out of Scope |
| |
| - The fixes of the exception-raising issue mentioned in the Motivation part for all the connectors are not included in |
| this PIP. This PIP only provides the feature for the connector developer to terminate the function instance. The fixes |
| should be in several different PRs. |
| |
| # High Level Design |
| |
| Introduce a new method `fatal` to the context. All the connector implementation code and the function code |
| can use this context and call the `fatal` method to terminate the instance while raising a fatal exception. |
| |
| After the connector or function raises the fatal exception, the function instance thread will be interrupted. |
| The function framework then could catch the exception, log it, and then terminate the function instance. |
| |
| # Detailed Design |
| |
| ## Design & Implementation Details |
| |
| This PIP proposes to add a new method`fatal`to the context `BaseContext`. This method allows the connector or the |
| function code to report a fatal exception to the function framework and terminate the instance. The `SinkContext` |
| and `SourceContext` are all inherited from `BaseContext`. Therefore, all the sink connectors and source connectors can |
| invoke this new method. The pulsar function context class `Context` is also inherited from `BaseContext`. Therefore, the |
| function code can also invoke this new method. |
| |
| In the `fatal` method, the function instance thread will be interrupted. The function instance thread can then |
| catch the interrupt exception and get the fatal exception. The function framework then logs this exception, |
| reports to the metrics, and finally terminates the function instance. |
| |
| Tbe behavior when invoking the `fatal` method: |
| |
| - For the connector thread or function thread: |
| - Invoke the `fatal` method |
| - Send the exception to the function framework. There is a field `deathException` in the |
| class `JavaInstanceRunnable` that is used to store the fatal exception. |
| - Interrupt the function instance thread |
| - For the function instance thread: |
| - Catch the interrupt exception |
| - Get the exception from the function framework |
| - Report the log and metrics |
| - Close the function instance |
| |
| ## Public-facing Changes |
| |
| ### Public API |
| |
| Introduce `fatal` method to the `BaseContext`: |
| |
| ```java |
| public interface BaseContext { |
| /** |
| * Terminate the function instance with a fatal exception. |
| * |
| * @param t the fatal exception to be raised |
| */ |
| void fatal(Throwable t); |
| } |
| ``` |
| |
| ### Binary protocol |
| |
| No changes for this part. |
| |
| ### Configuration |
| |
| No changes for this part. |
| |
| ### CLI |
| |
| No changes for this part. |
| |
| ### Metrics |
| |
| No changes for this part. |
| |
| # Monitoring |
| |
| No changes for this part. |
| |
| # Security Considerations |
| |
| No security-related changes. |
| The new method `fatal` will only take effect on the current function instance. It won't affect other function instances |
| even they are in the same function worker. |
| |
| # Backward & Forward Compatibility |
| |
| ## Revert |
| |
| No operation required. |
| |
| ## Upgrade |
| |
| No operation required. |
| |
| # Alternatives |
| |
| ## Using futures to handle results or exceptions returned the connector |
| |
| The benefit of this solution is that it makes the use of exception throwing more intuitive to the connector developer. |
| |
| But it requires changes to existing interfaces, including `Source` and `Sink`, which can complicate connector |
| development. And we still need the `fatal` method to handle some cases such as terminating the instance in code outside |
| of the message processing logic. This alternative solution can't handle this case. |
| |
| Meanwhile, the implementation of this solution will also be more complex, involving changes to the core message |
| processing logic of the function framework. We need to turn the entire message processing logic into an asynchronous |
| pattern. |
| |
| # General Notes |
| |
| # Links |
| |
| * Mailing List discussion thread: https://lists.apache.org/thread/j59gzzwjp8c48lwv5poddm9qzlp2hol0 |
| * Mailing List voting thread: https://lists.apache.org/thread/ggok3c2601mnbdomr65v3pjth3lk6fr8 |