Support ConsumerCryptoFailureAction for consumer and reader (#253)
diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index aae3359..f5b2b35 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -49,7 +49,7 @@
from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \
LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, ProducerAccessMode, RegexSubscriptionMode, \
- DeadLetterPolicyBuilder # noqa: F401
+ DeadLetterPolicyBuilder, ConsumerCryptoFailureAction # noqa: F401
from pulsar.__about__ import __version__
@@ -876,6 +876,7 @@
batch_index_ack_enabled=False,
regex_subscription_mode: RegexSubscriptionMode = RegexSubscriptionMode.PersistentOnly,
dead_letter_policy: Union[None, ConsumerDeadLetterPolicy] = None,
+ crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL,
):
"""
Subscribe to the given topic and subscription combination.
@@ -979,6 +980,19 @@
stopped. By using the dead letter mechanism, messages have the max redelivery count, when they're
exceeding the maximum number of redeliveries. Messages are sent to dead letter topics and acknowledged
automatically.
+ crypto_failure_action: ConsumerCryptoFailureAction, default=ConsumerCryptoFailureAction.FAIL
+ Set the behavior when the decryption fails. The default is to fail the message.
+
+ Supported actions:
+
+ * ConsumerCryptoFailureAction.FAIL: Fail consume until crypto succeeds
+ * ConsumerCryptoFailureAction.DISCARD:
+ Message is silently acknowledged and not delivered to the application.
+ * ConsumerCryptoFailureAction.CONSUME:
+ Deliver the encrypted message to the application. It's the application's responsibility
+ to decrypt the message. If message is also compressed, decompression will fail. If the
+ message contains batch messages, client will not be able to retrieve individual messages
+ in the batch.
"""
_check_type(str, subscription_name, 'subscription_name')
_check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -1002,6 +1016,7 @@
_check_type_or_none(ConsumerKeySharedPolicy, key_shared_policy, 'key_shared_policy')
_check_type(bool, batch_index_ack_enabled, 'batch_index_ack_enabled')
_check_type(RegexSubscriptionMode, regex_subscription_mode, 'regex_subscription_mode')
+ _check_type(ConsumerCryptoFailureAction, crypto_failure_action, 'crypto_failure_action')
conf = _pulsar.ConsumerConfiguration()
conf.consumer_type(consumer_type)
@@ -1040,6 +1055,7 @@
conf.batch_index_ack_enabled(batch_index_ack_enabled)
if dead_letter_policy:
conf.dead_letter_policy(dead_letter_policy.policy())
+ conf.crypto_failure_action(crypto_failure_action)
c = Consumer()
if isinstance(topic, str):
@@ -1068,7 +1084,8 @@
subscription_role_prefix=None,
is_read_compacted=False,
crypto_key_reader: Union[None, CryptoKeyReader] = None,
- start_message_id_inclusive=False
+ start_message_id_inclusive=False,
+ crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL,
):
"""
Create a reader on a particular topic
@@ -1129,6 +1146,19 @@
and private key decryption messages for the consumer
start_message_id_inclusive: bool, default=False
Set the reader to include the startMessageId or given position of any reset operation like Reader.seek
+ crypto_failure_action: ConsumerCryptoFailureAction, default=ConsumerCryptoFailureAction.FAIL
+ Set the behavior when the decryption fails. The default is to fail the message.
+
+ Supported actions:
+
+ * ConsumerCryptoFailureAction.FAIL: Fail consume until crypto succeeds
+ * ConsumerCryptoFailureAction.DISCARD:
+ Message is silently acknowledged and not delivered to the application.
+ * ConsumerCryptoFailureAction.CONSUME:
+ Deliver the encrypted message to the application. It's the application's responsibility
+ to decrypt the message. If message is also compressed, decompression will fail. If the
+ message contains batch messages, client will not be able to retrieve individual messages
+ in the batch.
"""
# If a pulsar.MessageId object is passed, access the _pulsar.MessageId object
@@ -1144,6 +1174,7 @@
_check_type(bool, is_read_compacted, 'is_read_compacted')
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
_check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive')
+ _check_type(ConsumerCryptoFailureAction, crypto_failure_action, 'crypto_failure_action')
conf = _pulsar.ReaderConfiguration()
if reader_listener:
@@ -1158,6 +1189,7 @@
if crypto_key_reader:
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
conf.start_message_id_inclusive(start_message_id_inclusive)
+ conf.crypto_failure_action(crypto_failure_action)
c = Reader()
c._reader = self._client.create_reader(topic, start_message_id, conf)
diff --git a/src/config.cc b/src/config.cc
index 7221b07..06822b4 100644
--- a/src/config.cc
+++ b/src/config.cc
@@ -313,7 +313,11 @@
.def("batch_index_ack_enabled", &ConsumerConfiguration::setBatchIndexAckEnabled,
return_value_policy::reference)
.def("dead_letter_policy", &ConsumerConfiguration::setDeadLetterPolicy)
- .def("dead_letter_policy", &ConsumerConfiguration::getDeadLetterPolicy, return_value_policy::copy);
+ .def("dead_letter_policy", &ConsumerConfiguration::getDeadLetterPolicy, return_value_policy::copy)
+ .def("crypto_failure_action", &ConsumerConfiguration::getCryptoFailureAction,
+ return_value_policy::copy)
+ .def("crypto_failure_action", &ConsumerConfiguration::setCryptoFailureAction,
+ return_value_policy::reference);
class_<ReaderConfiguration, std::shared_ptr<ReaderConfiguration>>(m, "ReaderConfiguration")
.def(init<>())
@@ -331,5 +335,9 @@
.def("read_compacted", &ReaderConfiguration::setReadCompacted)
.def("crypto_key_reader", &ReaderConfiguration::setCryptoKeyReader, return_value_policy::reference)
.def("start_message_id_inclusive", &ReaderConfiguration::isStartMessageIdInclusive)
- .def("start_message_id_inclusive", &ReaderConfiguration::setStartMessageIdInclusive, return_value_policy::reference);
+ .def("start_message_id_inclusive", &ReaderConfiguration::setStartMessageIdInclusive, return_value_policy::reference)
+ .def("crypto_failure_action", &ReaderConfiguration::getCryptoFailureAction,
+ return_value_policy::copy)
+ .def("crypto_failure_action", &ReaderConfiguration::setCryptoFailureAction,
+ return_value_policy::reference);
}
diff --git a/src/enums.cc b/src/enums.cc
index 198edfa..447d013 100644
--- a/src/enums.cc
+++ b/src/enums.cc
@@ -19,6 +19,7 @@
#include "utils.h"
#include <pulsar/CompressionType.h>
#include <pulsar/ConsumerConfiguration.h>
+#include <pulsar/ConsumerCryptoFailureAction.h>
#include <pulsar/ProducerConfiguration.h>
#include <pulsar/KeySharedPolicy.h>
#include <pybind11/pybind11.h>
@@ -140,4 +141,9 @@
.value("Info", Logger::LEVEL_INFO)
.value("Warn", Logger::LEVEL_WARN)
.value("Error", Logger::LEVEL_ERROR);
+
+ enum_<ConsumerCryptoFailureAction>(m, "ConsumerCryptoFailureAction")
+ .value("FAIL", ConsumerCryptoFailureAction::FAIL)
+ .value("DISCARD", ConsumerCryptoFailureAction::DISCARD)
+ .value("CONSUME", ConsumerCryptoFailureAction::CONSUME);
}
diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py
index 94c04e3..6c0c3f2 100755
--- a/tests/pulsar_test.py
+++ b/tests/pulsar_test.py
@@ -482,6 +482,63 @@
client.close()
+ def test_encryption_failure(self):
+ publicKeyPath = CERTS_DIR + "public-key.client-rsa.pem"
+ privateKeyPath = CERTS_DIR + "private-key.client-rsa.pem"
+ crypto_key_reader = CryptoKeyReader(publicKeyPath, privateKeyPath)
+ client = Client(self.serviceUrl)
+ topic = "my-python-test-end-to-end-encryption-failure-" + str(time.time())
+ producer = client.create_producer(
+ topic=topic, encryption_key="client-rsa.pem", crypto_key_reader=crypto_key_reader
+ )
+ producer.send(b"msg-0")
+
+ def verify_next_message(value: bytes):
+ consumer = client.subscribe(topic, subscription,
+ crypto_key_reader=crypto_key_reader)
+ msg = consumer.receive(3000)
+ self.assertEqual(msg.data(), value)
+ consumer.acknowledge(msg)
+ consumer.close()
+
+ subscription = "my-sub"
+ consumer = client.subscribe(topic, subscription,
+ initial_position=InitialPosition.Earliest,
+ crypto_failure_action=pulsar.ConsumerCryptoFailureAction.FAIL)
+ with self.assertRaises(pulsar.Timeout):
+ consumer.receive(3000)
+ consumer.close()
+ producer.send(b"msg-1")
+ verify_next_message(b"msg-0") # msg-0 won't be skipped
+
+ consumer = client.subscribe(topic, subscription,
+ initial_position=InitialPosition.Earliest,
+ crypto_failure_action=pulsar.ConsumerCryptoFailureAction.DISCARD)
+ with self.assertRaises(pulsar.Timeout):
+ consumer.receive(3000)
+ consumer.close()
+
+ producer.send(b"msg-2")
+ verify_next_message(b"msg-2") # msg-1 is skipped since the crypto failure action is DISCARD
+
+ # Encrypted messages will be consumed since the crypto failure action is CONSUME
+ consumer = client.subscribe(topic, 'another-sub',
+ initial_position=InitialPosition.Earliest,
+ crypto_failure_action=pulsar.ConsumerCryptoFailureAction.CONSUME)
+ for i in range(3):
+ msg = consumer.receive(3000)
+ self.assertNotEqual(msg.data(), f"msg-{i}".encode())
+ self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}")
+
+ reader = client.create_reader(topic, MessageId.earliest,
+ crypto_failure_action=pulsar.ConsumerCryptoFailureAction.CONSUME)
+ for i in range(3):
+ msg = reader.read_next(3000)
+ self.assertNotEqual(msg.data(), f"msg-{i}".encode())
+ self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}")
+
+ client.close()
+
def test_tls_auth3(self):
authPlugin = "tls"
authParams = "tlsCertFile:%s/client-cert.pem,tlsKeyFile:%s/client-key.pem" % (CERTS_DIR, CERTS_DIR)