Title: [io] Add notifyError method on PushSource

Motivation

In function framework, when source.read() method throw an exception, it will trigger close function instance. If it is in the k8s environment, it will be restarted, you can use the PushSource class and extend it to quickly implement the push message model. It overrides the read method and provides the consume method for the user to call.

However, if the source connector extends from the class, it cannot notify the function framework if it encounters an exception while consuming data internally, in other words, the function call source.read() never triggers an exception and never exits the process.

Goals

Add notifyError method on PushSource, This method can receive an exception and put the exception in the queue. The next time an exception is read, will throws exception.


public Record<T> read() throws Exception { Record<T> record = queue.take(); if (record instanceof ErrorNotifierRecord) { throw ((ErrorNotifierRecord) record).getException(); } return record; } /** * Allows the source to notify errors asynchronously. * @param ex */ public void notifyError(Exception ex) { consume(new ErrorNotifierRecord(ex)); } }

Just like the implementation of the current BatchPushSource

Compatibility

This PIP is to provide a method for users rather than introducing a new interface.

  • So it is forward compatible
  • However, connectors using this method are not backward compatible. For example, If a Kafka source connector built upon pulsar-io v3.1 (including features introduced in this PIP) and uses the notifyError method, when it switches back to pulsar-io v3.0 (excluding features introduced in this PIP), it will encounter errors during compilation.

In Scope

After this PIP, the source connectors can extends the PushSource, and use notifyError method to throw exception. Such as:

Out of Scope

None

Design & Implementation Details

  • Abstract BatchPushSource logic to AbstractPushSource.
  • Let PushSource to extends AbstractPushSource to extend a new method(notifyError).

Please refer this PR: https://github.com/apache/pulsar/pull/20791

Note

None

Concrete Example

BEFORE

  • Not possible

AFTER

public class PushSourceTest {

  PushSource testBatchSource = new PushSource() {
    @Override
    public void open(Map config, SourceContext context) throws Exception {

    }

    @Override
    public void close() throws Exception {

    }
  };

  @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "test exception")
  public void testNotifyErrors() throws Exception {
    testBatchSource.notifyError(new RuntimeException("test exception"));
    testBatchSource.readNext();
  }
}

Links

None