Fix encrption bug with chunked message (#13689)
# Motivation
Fix issue #13688.
Send chunking message failed with `org.apache.pulsar.client.api.PulsarClientException$TimeoutException` when encryption is enabled.
### Modifications
The root cause is that all chunked messages share the same msgMetadata object.
The `EncryptionKeys` will be repeated added into message metadata.
And proto buffer objects do not support serialization with bytes value type.
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 6728fc5..d1cf5ad 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -77,6 +77,7 @@
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -2621,6 +2622,38 @@
}
@Test
+ public void testCryptoWithChunking() throws Exception {
+ final String topic = "persistent://my-property/my-ns/testCryptoWithChunking" + System.currentTimeMillis();
+ final String ecdsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-ecdsa.pem";
+ final String ecdsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-ecdsa.pem";
+
+ this.conf.setMaxMessageSize(1000);
+
+ @Cleanup
+ PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+
+ @Cleanup
+ Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+ .defaultCryptoKeyReader(ecdsaPrivateKeyFile).subscribe();
+ @Cleanup
+ Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topic)
+ .enableChunking(true)
+ .enableBatching(false)
+ .addEncryptionKey("client-ecdsa.pem")
+ .defaultCryptoKeyReader(ecdsaPublicKeyFile)
+ .create();
+
+ byte[] data = RandomUtils.nextBytes(5100);
+ MessageId id = producer1.send(data);
+ log.info("Message Id={}", id);
+
+ MessageImpl<byte[]> message;
+ message = (MessageImpl<byte[]>) consumer1.receive();
+ Assert.assertEquals(message.getData(), data);
+ Assert.assertEquals(message.getEncryptionCtx().get().getKeys().size(), 1);
+ }
+
+ @Test
public void testDefaultCryptoKeyReader() throws Exception {
final String topic = "persistent://my-property/my-ns/default-crypto-key-reader" + System.currentTimeMillis();
final String ecdsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-ecdsa.pem";
diff --git a/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java b/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java
index acb8229..ba7a4ab 100644
--- a/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java
+++ b/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java
@@ -385,6 +385,7 @@
return;
}
+ msgMetadata.clearEncryptionKeys();
// Update message metadata with encrypted data key
for (String keyName : encKeys) {
if (encryptedDataKeyMap.get(keyName) == null) {