| # PIP-436: Add decryptFailListener to Consumer |
| |
| # Background knowledge |
| Apache Pulsar supports client-side encryption where messages can be encrypted by producers and decrypted by consumers. The current mechanism for handling decryption failures uses ConsumerCryptoFailureAction which provides three options: |
| |
| - FAIL: Fail message consumption (default) |
| |
| - DISCARD: Silently discard the message |
| |
| - CONSUME: Deliver the encrypted message to the application |
| |
| With the introduction of ```isEncrypted``` field in ```EncryptionContext``` ([PIP-432](https://github.com/apache/pulsar/pull/24481)), applications can now determine whether a message was successfully decrypted by checking this flag. However, this requires manual inspection of each message. |
| # Motivation |
| **This PIP builds upon and extends [PIP-432]((https://github.com/apache/pulsar/pull/24481)) by providing a more elegant way to handle decryption failures. Which was introduced in https://github.com/apache/pulsar/pull/24481#issuecomment-3048640812** |
| |
| The proposed decryptFailListener would: |
| |
| - Provide a cleaner, more elegant solution for handling decryption failures |
| - Allow more flexible handling of decrypted vs encrypted messages |
| - With the ```decryptionFailListener```, users can use listener instead of ```ConsumerCryptoFailureAction```. They can decide in the listener to discard, or decrypt, or anything they want |
| - Maintain backward compatibility with existing ConsumerCryptoFailureAction behavior |
| |
| # Goals |
| |
| ## In Scope |
| - Add new DecryptFailListener and ReaderDecryptFailListener interfaces |
| |
| - Add corresponding builder methods to ConsumerBuilder and ReaderBuilder |
| |
| - Implement the listener callback mechanism in consumer implementations |
| |
| - Ensure proper interaction with existing ConsumerCryptoFailureAction settings |
| |
| - Maintain backward compatibility with existing applications |
| |
| - Enforce mutual exclusivity between decryptFailListener and ConsumerCryptoFailureAction |
| |
| ## Out of Scope |
| |
| - Changes to encryption/decryption algorithms or protocols |
| - Modifications to the core encryption/decryption process |
| - Performance optimizations specific to the new listener |
| - Changes to producer-side encryption logic |
| # High Level Design |
| |
| The solution adds new listener interfaces specifically for decryption failures: |
| ```java |
| public interface DecryptFailListener<T> { |
| void received(Consumer<T> consumer, Message<T> message); |
| } |
| |
| public interface ReaderDecryptFailListener<T> { |
| void received(Reader<T> reader, Message<T> message); |
| } |
| ``` |
| ### Configuration Validation: |
| 1.Mutual Exclusivity Check: |
| ```java |
| if (conf.getDecryptFailListener() != null && conf.getCryptoFailureAction() != null) { |
| throw new InvalidConfigurationException( |
| "decryptFailListener cannot be used with cryptoFailureAction - choose one approach"); |
| } |
| ``` |
| 2.Listener Dependency Check: |
| ```java |
| if (conf.getDecryptFailListener() != null && conf.getMessageListener() == null) { |
| throw new InvalidConfigurationException( |
| "decryptFailListener requires messageListener to be configured"); |
| } |
| ``` |
| 3.Default Behavior: |
| ```java |
| if (conf.getDecryptFailListener() == null && conf.getCryptoFailureAction() == null) { |
| conf.setCryptoFailureAction(ConsumerCryptoFailureAction.FAIL); |
| } |
| ``` |
| ### Runtime Behavior: |
| 1. For successful decryption: |
| - Always routes to `messageListener` |
| |
| 2. For failed decryption: |
| - If `decryptFailListener` is configured: |
| - Routes to `decryptFailListener` |
| - Skips `ConsumerCryptoFailureAction` behavior |
| - Else: |
| - Follows configured `ConsumerCryptoFailureAction` |
| |
| |
| ## Public-facing Changes |
| ### Public API |
| #### New interfaces: |
| ```java |
| public interface DecryptFailListener<T> { |
| void received(Consumer<T> consumer, Message<T> message); |
| } |
| |
| public interface ReaderDecryptFailListener<T> { |
| void received(Reader<T> reader, Message<T> message); |
| } |
| ``` |
| #### New ConsumerBuilder methods: |
| ```java |
| ConsumerBuilder<T> decryptFailListener(DecryptFailListener<T> decryptFailListener); |
| ``` |
| #### New ReaderBuilder methods: |
| ```java |
| ReaderBuilder<T> readerDecryptFailListener(ReaderDecryptFailListener<T> readerDecryptFailListener); |
| ``` |
| #### Usage pattern: |
| ```java |
| pulsarClient.newConsumer() |
| .topic("my-topic") |
| .subscriptionName("my-sub") |
| .messageListener(normalListener) |
| .decryptFailListener(failureListener) |
| .subscribe(); |
| ``` |
| **Breaking Changes:** None - this is a purely additive change. |
| |
| # Backward & Forward Compatibility |
| |
| No compatibility concerns. The new listener is optional and existing code using ConsumerCryptoFailureAction will continue to work unchanged. |
| ## Downgrade / Rollback |
| Rolling back to a previous Pulsar version: |
| |
| - Applications using decryptFailListener will get compilation errors |
| - Remove decryptFailListener configuration from application code |
| - Revert to using ConsumerCryptoFailureAction if needed |
| - Downgrade Pulsar client library |
| - No data loss or corruption risk |
| |
| ## Migration Path |
| Existing applications can: |
| |
| - Continue using ConsumerCryptoFailureAction (no changes required) |
| |
| - Migrate to decryptFailListener for more flexible handling |
| |
| - Use both mechanisms during transition period (in separate consumers) |
| |
| The decryptFailListener provides a more flexible alternative to ConsumerCryptoFailureAction, though both can coexist for backward compatibility. |
| # Links |
| |
| * Mailing List discussion thread: https://lists.apache.org/thread/jp3bo63n20k58dxq9kxn7q0t09p34k9y |
| * Mailing List voting thread: https://lists.apache.org/thread/c14cctkmjh5g7h55lm30ddsncbrq66n8 |