Apache Pulsar supports client-side encryption where messages can be encrypted by producers and decrypted by consumers. The current mechanism for handling decryption failures uses ConsumerCryptoFailureAction which provides three options:
FAIL: Fail message consumption (default)
DISCARD: Silently discard the message
CONSUME: Deliver the encrypted message to the application
With the introduction of isEncrypted
field in EncryptionContext
(PIP-432), applications can now determine whether a message was successfully decrypted by checking this flag. However, this requires manual inspection of each message.
This PIP builds upon and extends PIP-432 by providing a more elegant way to handle decryption failures. Which was introduced in https://github.com/apache/pulsar/pull/24481#issuecomment-3048640812
The proposed decryptFailListener would:
decryptionFailListener
, users can use listener instead of ConsumerCryptoFailureAction
. They can decide in the listener to discard, or decrypt, or anything they wantAdd new DecryptFailListener and ReaderDecryptFailListener interfaces
Add corresponding builder methods to ConsumerBuilder and ReaderBuilder
Implement the listener callback mechanism in consumer implementations
Ensure proper interaction with existing ConsumerCryptoFailureAction settings
Maintain backward compatibility with existing applications
Enforce mutual exclusivity between decryptFailListener and ConsumerCryptoFailureAction
The solution adds new listener interfaces specifically for decryption failures:
public interface DecryptFailListener<T> { void received(Consumer<T> consumer, Message<T> message); } public interface ReaderDecryptFailListener<T> { void received(Reader<T> reader, Message<T> message); }
1.Mutual Exclusivity Check:
if (conf.getDecryptFailListener() != null && conf.getCryptoFailureAction() != null) { throw new InvalidConfigurationException( "decryptFailListener cannot be used with cryptoFailureAction - choose one approach"); }
2.Listener Dependency Check:
if (conf.getDecryptFailListener() != null && conf.getMessageListener() == null) { throw new InvalidConfigurationException( "decryptFailListener requires messageListener to be configured"); }
3.Default Behavior:
if (conf.getDecryptFailListener() == null && conf.getCryptoFailureAction() == null) { conf.setCryptoFailureAction(ConsumerCryptoFailureAction.FAIL); }
For successful decryption:
messageListener
For failed decryption:
decryptFailListener
is configured:decryptFailListener
ConsumerCryptoFailureAction
behaviorConsumerCryptoFailureAction
public interface DecryptFailListener<T> { void received(Consumer<T> consumer, Message<T> message); } public interface ReaderDecryptFailListener<T> { void received(Reader<T> reader, Message<T> message); }
ConsumerBuilder<T> decryptFailListener(DecryptFailListener<T> decryptFailListener);
ReaderBuilder<T> readerDecryptFailListener(ReaderDecryptFailListener<T> readerDecryptFailListener);
pulsarClient.newConsumer() .topic("my-topic") .subscriptionName("my-sub") .messageListener(normalListener) .decryptFailListener(failureListener) .subscribe();
Breaking Changes: None - this is a purely additive change.
No compatibility concerns. The new listener is optional and existing code using ConsumerCryptoFailureAction will continue to work unchanged.
Rolling back to a previous Pulsar version:
Existing applications can:
Continue using ConsumerCryptoFailureAction (no changes required)
Migrate to decryptFailListener for more flexible handling
Use both mechanisms during transition period (in separate consumers)
The decryptFailListener provides a more flexible alternative to ConsumerCryptoFailureAction, though both can coexist for backward compatibility.