The Pulsar Function is a serverless computing framework that runs on top of Pulsar and processes messages.
The Pulsar IO Connector is a framework that allows users to easily integrate Pulsar with external systems, such as databases, messaging systems, and data pipelines. With Pulsar IO Connector, you can create, deploy, and manage connectors that read data from or write data to Pulsar topics. There are two types of Pulsar IO Connectors: source and sink. A source connector imports data from another system to Pulsar, while a sink connector exports data from Pulsar to another system. The Pulsar IO Connector is implemented based on the Pulsar Function framework. So in the following, we treat the connector as a special kind of function. The function
refers to both function and connector.
Function Instance is a running instance of a Pulsar IO Connector that interacts with a specific external system or a Pulsar Function that processes messages from the topic.
Function Framework is a framework for running the Function instance.
Function Context is an interface that provides access to various information and resources for the connector or the function. The function context is passed to the connector or the function when it is initialized, and then can be used to interact with the Pulsar system.
Function instance thread: The function framework initializes a thread for each function instance to handle the core logic of the function/connector, including consuming messages from the Pulsar topic for the sink connector, executing the logic of the function, producing messages to the Pulsar topic for the source connector, handling the exception, etc. And let's define the Connector thread/Function thread as a thread that is created by the connector or function itself.
Exception handling logic: The function itself can throw exceptions, and this thread will catch the exception and then close the function. This means that the function will stop working until it is restarted manually or automatically by the function framework.
Even though it is not explicitly defined, there are two types of exceptions that should be handled by the function or the framework:
All the exceptions thrown form the connector are treated as fatal exceptions.
If the exception is thrown from the function instance thread, the function framework will catch the exception and terminate the function instance.
If the exception is thrown from the connector thread that is created by the connector itself, the function framework will not be able to catch the exception and terminate the function instance. The connector will hang forever. The Motivation
part will talk more about this case.
If the exception is thrown from the external system, the connector implementation could treat it as a retryable exception and retry to process the message later, or throw it to indicate it as a fatal exception.
All the exceptions thrown from the pulsar function are treated as non-fatal exceptions. The function framework will catch the exception and log it. But it will not terminate the function instance.
There is no way for the function developer to throw a fatal exception to the function framework to terminate the function instance.
Currently, the connector and function cannot terminate the function instance if there are fatal exceptions thrown outside the function instance thread. The current implementation of the connector and Pulsar Function exception handler cannot handle the fatal exceptions that are thrown outside the function instance thread.
For example, suppose we have a sink connector that uses its own threads to batch-sink the data to an external system. If any fatal exceptions occur in those threads, the function instance thread will not be aware of them and will not be able to terminate the connector. This will cause the connector to hang indefinitely. There is a related issue here: https://github.com/apache/pulsar/issues/9464
The same problem exists for the source connector. The source connector may also use a separate thread to fetch data from an external system. If any fatal exceptions happen in that thread, the connector will also hang forever. This issue has been observed for the Kafka source connector: https://github.com/apache/pulsar/issues/9464. We have fixed it by adding the notifyError method to the PushSource
class in PIP-281: https://github.com/apache/pulsar/pull/20807. However, this does not solve the same problem that all source connectors face because not all connectors are implemented based on the PushSource
class.
The problem is same for the Pulsar Function. Currently, the function can't throw fatal exceptions to the function framework. We need to provide a way for the function developer to implement it.
We need a way for the connector and function developers to throw fatal exceptions outside the function instance thread. The function framework should catch these exceptions and terminate the function accordingly.
Introduce a new method fatal
to the context. All the connector implementation code and the function code can use this context and call the fatal
method to terminate the instance while raising a fatal exception.
After the connector or function raises the fatal exception, the function instance thread will be interrupted. The function framework then could catch the exception, log it, and then terminate the function instance.
This PIP proposes to add a new methodfatal
to the context BaseContext
. This method allows the connector or the function code to report a fatal exception to the function framework and terminate the instance. The SinkContext
and SourceContext
are all inherited from BaseContext
. Therefore, all the sink connectors and source connectors can invoke this new method. The pulsar function context class Context
is also inherited from BaseContext
. Therefore, the function code can also invoke this new method.
In the fatal
method, the function instance thread will be interrupted. The function instance thread can then catch the interrupt exception and get the fatal exception. The function framework then logs this exception, reports to the metrics, and finally terminates the function instance.
Tbe behavior when invoking the fatal
method:
fatal
methoddeathException
in the class JavaInstanceRunnable
that is used to store the fatal exception.Introduce fatal
method to the BaseContext
:
public interface BaseContext { /** * Terminate the function instance with a fatal exception. * * @param t the fatal exception to be raised */ void fatal(Throwable t); }
No changes for this part.
No changes for this part.
No changes for this part.
No changes for this part.
No changes for this part.
No security-related changes. The new method fatal
will only take effect on the current function instance. It won't affect other function instances even they are in the same function worker.
No operation required.
No operation required.
The benefit of this solution is that it makes the use of exception throwing more intuitive to the connector developer.
But it requires changes to existing interfaces, including Source
and Sink
, which can complicate connector development. And we still need the fatal
method to handle some cases such as terminating the instance in code outside of the message processing logic. This alternative solution can't handle this case.
Meanwhile, the implementation of this solution will also be more complex, involving changes to the core message processing logic of the function framework. We need to turn the entire message processing logic into an asynchronous pattern.