| # PIP-281: Add notifyError method on PushSource |
| |
| ## Motivation |
| |
| In function framework, when [source.read()](https://github.com/apache/pulsar/blob/f7c0b3c49c9ad8c28d0b00aa30d727850eb8bc04/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java#L496-L506) method throw an exception, it will trigger close function instance. If it is in the k8s environment, it will be restarted, |
| you can use the [PushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java) class and extend it to quickly implement the push message model. |
| It overrides the `read` method and provides the `consume` method for the user to call. |
| |
| However, if the source connector extends from the class, |
| it cannot notify the function framework if it encounters an exception while consuming data internally, |
| in other words, the function call `source.read()` never triggers an exception and never exits the process. |
| |
| |
| ## Goals |
| |
| Add `notifyError` method on PushSource, This method can receive an exception and put the exception in the queue. The next time an exception is `read`, will throws exception. |
| ```java |
| |
| public Record<T> read() throws Exception { |
| Record<T> record = queue.take(); |
| if (record instanceof ErrorNotifierRecord) { |
| throw ((ErrorNotifierRecord) record).getException(); |
| } |
| return record; |
| } |
| |
| |
| /** |
| * Allows the source to notify errors asynchronously. |
| * @param ex |
| */ |
| public void notifyError(Exception ex) { |
| consume(new ErrorNotifierRecord(ex)); |
| } |
| } |
| ``` |
| |
| Just like the implementation of the current [BatchPushSource](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/BatchPushSource.java) |
| |
| |
| ### Compatibility |
| |
| This PIP is to provide a method for users rather than introducing a new interface. |
| |
| - So it is forward compatible |
| - However, connectors using this method are not backward compatible. |
| For example, If a Kafka source connector built upon pulsar-io v3.1 (including features introduced in this PIP) and uses the `notifyError` method, |
| when it switches back to pulsar-io v3.0 (excluding features introduced in this PIP), it will encounter errors during compilation. |
| |
| ### In Scope |
| |
| After this PIP, the source connectors can extends the `PushSource`, and use `notifyError` method to throw exception. Such as: |
| - [KafkaSourceConnector](https://github.com/apache/pulsar/blob/branch-3.0/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java) |
| - [CanalSourceConnector](https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-io/canal/src/main/java/org/apache/pulsar/io/canal/CanalAbstractSource.java#L43) |
| - [MongoSourceConnector](https://github.com/apache/pulsar/blob/82237d3684fe506bcb6426b3b23f413422e6e4fb/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java#L59) |
| - etc. |
| |
| ### Out of Scope |
| None |
| |
| ## Design & Implementation Details |
| |
| - Abstract BatchPushSource logic to AbstractPushSource. |
| - Let PushSource to extends AbstractPushSource to extend a new method(notifyError). |
| |
| Please refer this PR: https://github.com/apache/pulsar/pull/20791 |
| |
| ## Note |
| None |
| |
| |
| ## Concrete Example |
| |
| ### BEFORE |
| - Not possible |
| |
| ### AFTER |
| |
| ```java |
| public class PushSourceTest { |
| |
| PushSource testBatchSource = new PushSource() { |
| @Override |
| public void open(Map config, SourceContext context) throws Exception { |
| |
| } |
| |
| @Override |
| public void close() throws Exception { |
| |
| } |
| }; |
| |
| @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "test exception") |
| public void testNotifyErrors() throws Exception { |
| testBatchSource.notifyError(new RuntimeException("test exception")); |
| testBatchSource.readNext(); |
| } |
| } |
| ``` |
| |
| ## Links |
| None |