Web Socket Proxy Server provides a simple way to interact with Pulsar under WSS
protocol.
2.11
, there is a feature that provides a way to set encrypt keys for the internal producers and consumers of Web Socket Proxy Server, but needs the user to upload both public key and private key into the Web Socket Proxy Server(in other words: user should expose the keys to Web Socket Proxy Server), there is a un-recommended workaround for this shortcoming[1]. The benefit is that the WSS producer and WSS consumer should not care about encryption and decryption.The Construction of the Encrypt Context:
{ "batchSize": 2, // How many single messages are in the batch. If null, it means it is not a batched message. "compressionType": "NONE", // the compression type. "uncompressedMessageSize": 0, // the size of the uncompressed payload. "keys": { "client-rsa.pem": { // key name. "keyValue": "asdvfdw==", // key value. "metadata": {} // extra props of the key. } }, "param": "Tfu1PxVm6S9D3+Hk" // the IV of current encryption for this message. }
All the fields of Encrypt Context are used to parse the encrypted message payload.
keys
and param
are used to decrypt the encrypted message payload.compressionType
and uncompressedMessageSize
are used to uncompress the compressed message payload.batchSize
is used to extract the batched message payload.There is another attribute named encryptionAlgo
used to identify what encrypt algo is using, it is an optional attribute, so there is no such property in Encrypt Context.
When the internal consumer of the Web Socket Proxy Server receives a message, if the message metadata indicates that the message is encrypted, the consumer will add Encrypt Context into the response for the WSS consumer.
CryptoKeyReader
: an interface that requires users to implement to read public key and private key.MessageCrypto
: a tool interface to encrypt and decrypt the message payload and add and extract encryption information for message metadata.Therefore, there is no way to enable encryption under the WSS protocol and meet the following conditions:
Provide a way to make Web Socket Proxy Server just passes encrypt information to the client, the WSS producer and WSS consumer did encrypt and decrypt themselves.
Since the order of producer operation for message payloads is compression --> encryption,
users need to handle Compression themselves if needed.
If other clients(such as Java, CPP) are sending messages to the topic that the WSS consumer was subscribed to, it is possible that there are some batched messages in the topic, then the WSS consumer will inevitably receive the batched messages. Since the order of consumer operation for message payload is deencryption --> un-compression --> extract the batched messages
, users need to handle Un-compression and Extract Batch Messages themselves.
This proposal does not intend to support the three features:
For WSS producers: Modify the definition of parameter encryptionKeys
to make it can set in two ways:
encryptionKeys
, then Web Socket Proxy Server will still work in the original way, which is defined in the PIP Support encryption in Web Socket Proxy ServerencryptionKeys
, and the encryptionKeys[{key_name}].keyValue
is not empty, Web Socket Proxy Server will mark this Producer as Client-Side Encryption Producer, then discard server-side batch messages, server-side compression, and server-side encryption. The constructor of encryptionKeys
is like below:{ "client-ecdsa.pem": { "keyValue": "BDJfN+Iw==", "metadata": { "k1": "v1" } } }
For WSS consumers: Users can set the parameter cryptoFailureAction
to CONSUME
to directly receive the undecrypted message payload (it was supported before).
For the producers marked as Client-Side Encryption Producers:
CryptoKeyReader
to DummyCryptoKeyReaderImpl
.DummyCryptoKeyReaderImpl
: doesn't provide any public key or private key, and just returns null
.MessageCrypto
to WSSDummyMessageCryptoImpl
to skip the message Server-Side encryption.WSSDummyMessageCryptoImpl
: only set the encryption info into the message metadata and discard payload encryption.enableBatching
to false
to skip Server-Side batch messages building, and print a log if the discarded parameters enableBatching
, batchingMaxMessages
, maxPendingMessages
, batchingMaxPublishDelay
were set.CompressionType
to None
to skip the Server-Side compression, and print a log if the discarded parameter compressionType
was set.enableChunking
to false
(the default value is false
) to prevent unexpected problems if the default setting is changed in the future.For the client-side encryption consumers:
cryptoFailureAction
of the consumer is CONSUME
, just print an DEBUG
level log when receiving an encrypted message if the consumer could not decrypt it(the original log level is WARN
).Define a new mode for the parameter encryptionKeys
: | param name | description| constructor (before encode) | | --- | --- | --- | | encryptionKeys
| Base64 encoded and URL encoded and JSON formatted encryption keys | Map<String, EncryptionKey>
|
Add JSON attributes below: | param name | description | constructor (before encode) | | --- | --- | --- | | compressionType
| Compression type. Do not set it if compression is not performed | CompressionType
| | uncompressedMessageSize
| The size of the payload before compression. Do not set it if compression is not performed | int
| | encryptionParam
| Base64 encoded serialized initialization vector used when the client encrypts | byte[]
|
public void connect() { String protocolAndHostPort = "ws://localhost:55217"; String topicName = "perssitent://public/default/tp1"; String keys = ``` { "client-ecdsa.pem": { "keyValue": "BDJf/72DhLRs0C0/U+vkykeIBfXaaJiwpqPVgWJvV7B7GwqIMvY6OFXdFvi0gx7Co/0xO7vKTHLQP8GZAt8DWrsCb8W1jhxmOjpThHBaksXG0kN+Iw==", "metadata": { "k1": "v1" } } } ``` StringBuilder producerUrL = new StringBuilder(protocolAndHostPort) .append("/ws/v2/producer/persistent/") .append(topicName) .append("?") .append("encryptionKeys=").append(base64AndURLEncode(keys)); WebSocketClient wssClient = new WebSocketClient(); wssClient.start(); Session session = wssClient.connect(this, producerUrL, new ClientUpgradeRequest()).get(); } public void sendMessage() { byte[] payload = "msg-123".getBytes(UTF-8); // [109, 115, 103, 45, 49, 50, 51] String msgKey = "client-ecdsa.pem"; // Compression if needed(optional). CompressionType compressionType = CompressionType.LZ4; msg.uncompressedMessageSize = 5; byte[] compressedPayload = compress(payload); // [109, 115, 103, 45, 49, 50, 51] // Encrypt if needed. bytes[] encryptionParam = getEncryptionParam(); // [-10, -5, -124, 23, 14, -122, 30, 127, 64, 63, 85, -79] String base64EncodedEncryptionParam = base64Encode(encryptionParam); // 9vuEFw6GHn9AP1Wx bytes[] encryptedPayload = encrypt(compressedPayload, encryptionParam); // H2RbToHyfXrAUJq3kCC81wlmpGRU5l4= // Do send. ProducerMessage msg = new ProducerMessage(); msg.key = msgKey; msg.payload = encryptedPayload; msg.encryptionParam = base64EncodedEncryptionParam; msg.compressionType = compressionType; msg.uncompressedMessageSize = uncompressedMessageSize; this.session.getRemote().sendString(toJSON(msg)); }
public void connect() { String protocolAndHostPort = "ws://localhost:55217"; String topicName = "perssitent://public/default/tp1"; StringBuilder consumerUri = new StringBuilder(protocolAndHostPort) .append("/ws/v2/consumer/persistent/") .append(topicName) .append("/") .append(subscriptionName) .append("?") .append("subscriptionType=").append(subscriptionType.toString()) // Set "cryptoFailureAction" to "CONSUME". .append("&").append("cryptoFailureAction=CONSUME"); WebSocketClient wssClient = new WebSocketClient(); wssClient.start(); Session session = wssClient.connect(this, buildConnectURL(), new ClientUpgradeRequest()).get(); } public byte[] messageReceived(String text) { /** * A demo of the parameter "text": * { * "messageId": "CAcQADAA", * "payload": "ApU16CsV0iHO2zbX7T22jhGMzdjE5drm", * "properties": {}, * "publishTime": "2023-08-22T02:40:32.856+08:00", * "redeliveryCount": 0, * "encryptionContext": { * "keys": { * "client-ecdsa.pem": { * "keyValue": "BMQKA==", * "metadata": { * "k1": "v1" * }, * "param": "SnqNyjPetp1dGBa6", * "compressionType": "LZ4", * "uncompressedMessageSize": 7, * "batchSize": null * } * } */ ConsumerMessage msg = parseJsonToObject(text); /** * The constructor of encryptionContext: * { * "client-ecdsa.pem": { * "keyValue": "BMQKA==", * "metadata": { * "k1": "v1" * } * } * } */ EncryptionContext encryptionContext = msg.encryptionContext; // base64Decode and decrypt message payload. byte[] decryptedPayload = decrypt(base64Decode(msg.payload), encryptionContext); //Un-compress is needed. byte[] unCompressedPayload = unCompressIfNeeded(decryptedPayload); return unCompressedPayload; }
[1]: A workaround to avoid exposing the private key to Web Socket Proxy Server(should expose the public key to Web Socket Proxy Server). A quick background: there are three policies when a consumer cannot describe the message payload:
unackMessagesTracker.
How this message is ultimately handled depends on the policy of unacknowledged messages.Workaround
cryptoFailureAction
to CONSUME
for the WSS consumerEncryptionKeyInfo
to null
for the CryptoKeyReader
. This will make the internal consumer of Web Socket Proxy Server decrypt message payload fail.Then the flow of Pub & Sub will be executed like the following: