Support ConsumerCryptoFailureAction for consumer and reader (#253)

diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index aae3359..f5b2b35 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -49,7 +49,7 @@
 
 from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \
     LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, ProducerAccessMode, RegexSubscriptionMode, \
-    DeadLetterPolicyBuilder  # noqa: F401
+    DeadLetterPolicyBuilder, ConsumerCryptoFailureAction  # noqa: F401
 
 from pulsar.__about__ import __version__
 
@@ -876,6 +876,7 @@
                   batch_index_ack_enabled=False,
                   regex_subscription_mode: RegexSubscriptionMode = RegexSubscriptionMode.PersistentOnly,
                   dead_letter_policy: Union[None, ConsumerDeadLetterPolicy] = None,
+                  crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL,
                   ):
         """
         Subscribe to the given topic and subscription combination.
@@ -979,6 +980,19 @@
           stopped. By using the dead letter mechanism, messages have the max redelivery count, when they're
           exceeding the maximum number of redeliveries. Messages are sent to dead letter topics and acknowledged
           automatically.
+        crypto_failure_action: ConsumerCryptoFailureAction, default=ConsumerCryptoFailureAction.FAIL
+          Set the behavior when the decryption fails. The default is to fail the message.
+
+          Supported actions:
+
+          * ConsumerCryptoFailureAction.FAIL: Fail consume until crypto succeeds
+          * ConsumerCryptoFailureAction.DISCARD:
+            Message is silently acknowledged and not delivered to the application.
+          * ConsumerCryptoFailureAction.CONSUME:
+            Deliver the encrypted message to the application. It's the application's responsibility
+            to decrypt the message. If message is also compressed, decompression will fail. If the
+            message contains batch messages, client will not be able to retrieve individual messages
+            in the batch.
         """
         _check_type(str, subscription_name, 'subscription_name')
         _check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -1002,6 +1016,7 @@
         _check_type_or_none(ConsumerKeySharedPolicy, key_shared_policy, 'key_shared_policy')
         _check_type(bool, batch_index_ack_enabled, 'batch_index_ack_enabled')
         _check_type(RegexSubscriptionMode, regex_subscription_mode, 'regex_subscription_mode')
+        _check_type(ConsumerCryptoFailureAction, crypto_failure_action, 'crypto_failure_action')
 
         conf = _pulsar.ConsumerConfiguration()
         conf.consumer_type(consumer_type)
@@ -1040,6 +1055,7 @@
         conf.batch_index_ack_enabled(batch_index_ack_enabled)
         if dead_letter_policy:
             conf.dead_letter_policy(dead_letter_policy.policy())
+        conf.crypto_failure_action(crypto_failure_action)
 
         c = Consumer()
         if isinstance(topic, str):
@@ -1068,7 +1084,8 @@
                       subscription_role_prefix=None,
                       is_read_compacted=False,
                       crypto_key_reader: Union[None, CryptoKeyReader] = None,
-                      start_message_id_inclusive=False
+                      start_message_id_inclusive=False,
+                      crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL,
                       ):
         """
         Create a reader on a particular topic
@@ -1129,6 +1146,19 @@
             and private key decryption messages for the consumer
         start_message_id_inclusive: bool, default=False
             Set the reader to include the startMessageId or given position of any reset operation like Reader.seek
+        crypto_failure_action: ConsumerCryptoFailureAction, default=ConsumerCryptoFailureAction.FAIL
+          Set the behavior when the decryption fails. The default is to fail the message.
+
+          Supported actions:
+
+          * ConsumerCryptoFailureAction.FAIL: Fail consume until crypto succeeds
+          * ConsumerCryptoFailureAction.DISCARD:
+            Message is silently acknowledged and not delivered to the application.
+          * ConsumerCryptoFailureAction.CONSUME:
+            Deliver the encrypted message to the application. It's the application's responsibility
+            to decrypt the message. If message is also compressed, decompression will fail. If the
+            message contains batch messages, client will not be able to retrieve individual messages
+            in the batch.
         """
 
         # If a pulsar.MessageId object is passed, access the _pulsar.MessageId object
@@ -1144,6 +1174,7 @@
         _check_type(bool, is_read_compacted, 'is_read_compacted')
         _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
         _check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive')
+        _check_type(ConsumerCryptoFailureAction, crypto_failure_action, 'crypto_failure_action')
 
         conf = _pulsar.ReaderConfiguration()
         if reader_listener:
@@ -1158,6 +1189,7 @@
         if crypto_key_reader:
             conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
         conf.start_message_id_inclusive(start_message_id_inclusive)
+        conf.crypto_failure_action(crypto_failure_action)
 
         c = Reader()
         c._reader = self._client.create_reader(topic, start_message_id, conf)
diff --git a/src/config.cc b/src/config.cc
index 7221b07..06822b4 100644
--- a/src/config.cc
+++ b/src/config.cc
@@ -313,7 +313,11 @@
         .def("batch_index_ack_enabled", &ConsumerConfiguration::setBatchIndexAckEnabled,
              return_value_policy::reference)
         .def("dead_letter_policy", &ConsumerConfiguration::setDeadLetterPolicy)
-        .def("dead_letter_policy", &ConsumerConfiguration::getDeadLetterPolicy, return_value_policy::copy);
+        .def("dead_letter_policy", &ConsumerConfiguration::getDeadLetterPolicy, return_value_policy::copy)
+        .def("crypto_failure_action", &ConsumerConfiguration::getCryptoFailureAction,
+            return_value_policy::copy)
+        .def("crypto_failure_action", &ConsumerConfiguration::setCryptoFailureAction,
+            return_value_policy::reference);
 
     class_<ReaderConfiguration, std::shared_ptr<ReaderConfiguration>>(m, "ReaderConfiguration")
         .def(init<>())
@@ -331,5 +335,9 @@
         .def("read_compacted", &ReaderConfiguration::setReadCompacted)
         .def("crypto_key_reader", &ReaderConfiguration::setCryptoKeyReader, return_value_policy::reference)
         .def("start_message_id_inclusive", &ReaderConfiguration::isStartMessageIdInclusive)
-        .def("start_message_id_inclusive", &ReaderConfiguration::setStartMessageIdInclusive, return_value_policy::reference);
+        .def("start_message_id_inclusive", &ReaderConfiguration::setStartMessageIdInclusive, return_value_policy::reference)
+        .def("crypto_failure_action", &ReaderConfiguration::getCryptoFailureAction,
+            return_value_policy::copy)
+        .def("crypto_failure_action", &ReaderConfiguration::setCryptoFailureAction,
+            return_value_policy::reference);
 }
diff --git a/src/enums.cc b/src/enums.cc
index 198edfa..447d013 100644
--- a/src/enums.cc
+++ b/src/enums.cc
@@ -19,6 +19,7 @@
 #include "utils.h"
 #include <pulsar/CompressionType.h>
 #include <pulsar/ConsumerConfiguration.h>
+#include <pulsar/ConsumerCryptoFailureAction.h>
 #include <pulsar/ProducerConfiguration.h>
 #include <pulsar/KeySharedPolicy.h>
 #include <pybind11/pybind11.h>
@@ -140,4 +141,9 @@
         .value("Info", Logger::LEVEL_INFO)
         .value("Warn", Logger::LEVEL_WARN)
         .value("Error", Logger::LEVEL_ERROR);
+    
+    enum_<ConsumerCryptoFailureAction>(m, "ConsumerCryptoFailureAction")
+        .value("FAIL", ConsumerCryptoFailureAction::FAIL)
+        .value("DISCARD", ConsumerCryptoFailureAction::DISCARD)
+        .value("CONSUME", ConsumerCryptoFailureAction::CONSUME);
 }
diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py
index 94c04e3..6c0c3f2 100755
--- a/tests/pulsar_test.py
+++ b/tests/pulsar_test.py
@@ -482,6 +482,63 @@
 
         client.close()
 
+    def test_encryption_failure(self):
+        publicKeyPath = CERTS_DIR + "public-key.client-rsa.pem"
+        privateKeyPath = CERTS_DIR + "private-key.client-rsa.pem"
+        crypto_key_reader = CryptoKeyReader(publicKeyPath, privateKeyPath)
+        client = Client(self.serviceUrl)
+        topic = "my-python-test-end-to-end-encryption-failure-" + str(time.time())
+        producer = client.create_producer(
+            topic=topic, encryption_key="client-rsa.pem", crypto_key_reader=crypto_key_reader
+        )
+        producer.send(b"msg-0")
+
+        def verify_next_message(value: bytes):
+            consumer = client.subscribe(topic, subscription,
+                                        crypto_key_reader=crypto_key_reader)
+            msg = consumer.receive(3000)
+            self.assertEqual(msg.data(), value)
+            consumer.acknowledge(msg)
+            consumer.close()
+
+        subscription = "my-sub"
+        consumer = client.subscribe(topic, subscription,
+                                    initial_position=InitialPosition.Earliest,
+                                    crypto_failure_action=pulsar.ConsumerCryptoFailureAction.FAIL)
+        with self.assertRaises(pulsar.Timeout):
+            consumer.receive(3000)
+        consumer.close()
+        producer.send(b"msg-1")
+        verify_next_message(b"msg-0") # msg-0 won't be skipped
+
+        consumer = client.subscribe(topic, subscription,
+                                    initial_position=InitialPosition.Earliest,
+                                    crypto_failure_action=pulsar.ConsumerCryptoFailureAction.DISCARD)
+        with self.assertRaises(pulsar.Timeout):
+            consumer.receive(3000)
+        consumer.close()
+
+        producer.send(b"msg-2")
+        verify_next_message(b"msg-2") # msg-1 is skipped since the crypto failure action is DISCARD
+
+        # Encrypted messages will be consumed since the crypto failure action is CONSUME
+        consumer = client.subscribe(topic, 'another-sub',
+                                    initial_position=InitialPosition.Earliest,
+                                    crypto_failure_action=pulsar.ConsumerCryptoFailureAction.CONSUME)
+        for i in range(3):
+            msg = consumer.receive(3000)
+            self.assertNotEqual(msg.data(), f"msg-{i}".encode())
+            self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}")
+
+        reader = client.create_reader(topic, MessageId.earliest,
+                                      crypto_failure_action=pulsar.ConsumerCryptoFailureAction.CONSUME)
+        for i in range(3):
+            msg = reader.read_next(3000)
+            self.assertNotEqual(msg.data(), f"msg-{i}".encode())
+            self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}")
+
+        client.close()
+
     def test_tls_auth3(self):
         authPlugin = "tls"
         authParams = "tlsCertFile:%s/client-cert.pem,tlsKeyFile:%s/client-key.pem" % (CERTS_DIR, CERTS_DIR)