PIP-436: Add decryptFailListener to Consumer

Background knowledge

Apache Pulsar supports client-side encryption where messages can be encrypted by producers and decrypted by consumers. The current mechanism for handling decryption failures uses ConsumerCryptoFailureAction which provides three options:

  • FAIL: Fail message consumption (default)

  • DISCARD: Silently discard the message

  • CONSUME: Deliver the encrypted message to the application

With the introduction of isEncrypted field in EncryptionContext (PIP-432), applications can now determine whether a message was successfully decrypted by checking this flag. However, this requires manual inspection of each message.

Motivation

This PIP builds upon and extends PIP-432 by providing a more elegant way to handle decryption failures. Which was introduced in https://github.com/apache/pulsar/pull/24481#issuecomment-3048640812

The proposed decryptFailListener would:

  • Provide a cleaner, more elegant solution for handling decryption failures
  • Allow more flexible handling of decrypted vs encrypted messages
  • With the decryptionFailListener, users can use listener instead of ConsumerCryptoFailureAction. They can decide in the listener to discard, or decrypt, or anything they want
  • Maintain backward compatibility with existing ConsumerCryptoFailureAction behavior

Goals

In Scope

  • Add new DecryptFailListener and ReaderDecryptFailListener interfaces

  • Add corresponding builder methods to ConsumerBuilder and ReaderBuilder

  • Implement the listener callback mechanism in consumer implementations

  • Ensure proper interaction with existing ConsumerCryptoFailureAction settings

  • Maintain backward compatibility with existing applications

  • Enforce mutual exclusivity between decryptFailListener and ConsumerCryptoFailureAction

Out of Scope

  • Changes to encryption/decryption algorithms or protocols
  • Modifications to the core encryption/decryption process
  • Performance optimizations specific to the new listener
  • Changes to producer-side encryption logic

High Level Design

The solution adds new listener interfaces specifically for decryption failures:

public interface DecryptFailListener<T> {
    void received(Consumer<T> consumer, Message<T> message);
}

public interface ReaderDecryptFailListener<T> {
    void received(Reader<T> reader, Message<T> message);
}

Configuration Validation:

1.Mutual Exclusivity Check:

if (conf.getDecryptFailListener() != null && conf.getCryptoFailureAction() != null) {
    throw new InvalidConfigurationException(
        "decryptFailListener cannot be used with cryptoFailureAction - choose one approach");
}

2.Listener Dependency Check:

if (conf.getDecryptFailListener() != null && conf.getMessageListener() == null) {
    throw new InvalidConfigurationException(
        "decryptFailListener requires messageListener to be configured");
}

3.Default Behavior:

if (conf.getDecryptFailListener() == null && conf.getCryptoFailureAction() == null) {
    conf.setCryptoFailureAction(ConsumerCryptoFailureAction.FAIL);
}

Runtime Behavior:

  1. For successful decryption:

    • Always routes to messageListener
  2. For failed decryption:

    • If decryptFailListener is configured:
      • Routes to decryptFailListener
      • Skips ConsumerCryptoFailureAction behavior
    • Else:
      • Follows configured ConsumerCryptoFailureAction

Public-facing Changes

Public API

New interfaces:

public interface DecryptFailListener<T> {
    void received(Consumer<T> consumer, Message<T> message);
}

public interface ReaderDecryptFailListener<T> {
    void received(Reader<T> reader, Message<T> message);
}

New ConsumerBuilder methods:

ConsumerBuilder<T> decryptFailListener(DecryptFailListener<T> decryptFailListener);

New ReaderBuilder methods:

ReaderBuilder<T> readerDecryptFailListener(ReaderDecryptFailListener<T> readerDecryptFailListener);

Usage pattern:

pulsarClient.newConsumer()
    .topic("my-topic")
    .subscriptionName("my-sub")
    .messageListener(normalListener)
    .decryptFailListener(failureListener)
    .subscribe();

Breaking Changes: None - this is a purely additive change.

Backward & Forward Compatibility

No compatibility concerns. The new listener is optional and existing code using ConsumerCryptoFailureAction will continue to work unchanged.

Downgrade / Rollback

Rolling back to a previous Pulsar version:

  • Applications using decryptFailListener will get compilation errors
  • Remove decryptFailListener configuration from application code
  • Revert to using ConsumerCryptoFailureAction if needed
  • Downgrade Pulsar client library
  • No data loss or corruption risk

Migration Path

Existing applications can:

  • Continue using ConsumerCryptoFailureAction (no changes required)

  • Migrate to decryptFailListener for more flexible handling

  • Use both mechanisms during transition period (in separate consumers)

The decryptFailListener provides a more flexible alternative to ConsumerCryptoFailureAction, though both can coexist for backward compatibility.

Links