add base package for encryption (#555)
Breakdown of PR [#552 ](https://github.com/apache/pulsar-client-go/pull/552)
This PR includes base interface and default implementation.
diff --git a/go.sum b/go.sum
index c56f5b7..b2818ee 100644
--- a/go.sum
+++ b/go.sum
@@ -165,6 +165,7 @@
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
+github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
diff --git a/pulsar/crypto/crypto_failure_action.go b/pulsar/crypto/crypto_failure_action.go
new file mode 100644
index 0000000..891d740
--- /dev/null
+++ b/pulsar/crypto/crypto_failure_action.go
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+const (
+ // ProducerCryptoFailureActionFail this is the default option to fail send if crypto operation fails.
+ ProducerCryptoFailureActionFail = iota
+
+ // ProducerCryptoFailureActionSend ingnore crypto failure and proceed with sending unencrypted message.
+ ProducerCryptoFailureActionSend
+)
+
+const (
+ // ConsumerCryptoFailureActionFail this is the default option to fail consume messages until crypto succeeds.
+ ConsumerCryptoFailureActionFail = iota
+
+ // ConsumerCryptoFailureActionDiscard message is silently acknowledged and not delivered to the application
+ ConsumerCryptoFailureActionDiscard
+
+ // ConsumerCryptoFailureActionConsume deliver the encrypted message to the application.
+ // It's the application's responsibility to decrypt the message.
+ // if message is also compressed, decompression will fail.
+ // If message contain batch messages, client will not be able to retrieve
+ // individual messages in the batch.
+ // delivered encrypted message contains EncryptionContext which contains encryption
+ // and compression information in it using which application can decrypt the payload.
+ ConsumerCryptoFailureActionConsume
+)
diff --git a/pulsar/crypto/crypto_key_reader.go b/pulsar/crypto/crypto_key_reader.go
new file mode 100644
index 0000000..51295c1
--- /dev/null
+++ b/pulsar/crypto/crypto_key_reader.go
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+// KeyReader implement this interface to read and provide public & private keys
+// key pair can be RSA, ECDSA
+type KeyReader interface {
+ // PublicKey get public key that is be used by the producer to encrypt data key
+ PublicKey(keyName string, metadata map[string]string) (*EncryptionKeyInfo, error)
+
+ // PrivateKey get private key that is used by the consumer to decrypt data key
+ PrivateKey(keyName string, metadata map[string]string) (*EncryptionKeyInfo, error)
+}
diff --git a/pulsar/crypto/default_crypto_Key_reader.go b/pulsar/crypto/default_crypto_Key_reader.go
new file mode 100644
index 0000000..6378d4e
--- /dev/null
+++ b/pulsar/crypto/default_crypto_Key_reader.go
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+import "io/ioutil"
+
+// FileKeyReader default implementation of KeyReader
+type FileKeyReader struct {
+ publicKeyPath string
+ privateKeyPath string
+}
+
+func NewFileKeyReader(publicKeyPath, privateKeyPath string) *FileKeyReader {
+ return &FileKeyReader{
+ publicKeyPath: publicKeyPath,
+ privateKeyPath: privateKeyPath,
+ }
+}
+
+// PublicKey read public key from the given path
+func (d *FileKeyReader) PublicKey(keyName string, keyMeta map[string]string) (*EncryptionKeyInfo, error) {
+ return readKey(keyName, d.publicKeyPath, keyMeta)
+}
+
+// PrivateKey read private key from the given path
+func (d *FileKeyReader) PrivateKey(keyName string, keyMeta map[string]string) (*EncryptionKeyInfo, error) {
+ return readKey(keyName, d.privateKeyPath, keyMeta)
+}
+
+func readKey(keyName, path string, keyMeta map[string]string) (*EncryptionKeyInfo, error) {
+ key, err := ioutil.ReadFile(path)
+ if err != nil {
+ return nil, err
+ }
+ return NewEncryptionKeyInfo(keyName, key, keyMeta), nil
+}
diff --git a/pulsar/crypto/default_crypto_key_reader_test.go b/pulsar/crypto/default_crypto_key_reader_test.go
new file mode 100644
index 0000000..776d08f
--- /dev/null
+++ b/pulsar/crypto/default_crypto_key_reader_test.go
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestGetPublicKey(t *testing.T) {
+ keyReader := NewFileKeyReader("../crypto/testdata/pub_key_rsa.pem", "")
+ keyInfo, err := keyReader.PublicKey("test-key", map[string]string{"key": "value"})
+
+ assert.Nil(t, err)
+ assert.NotNil(t, keyInfo)
+ assert.NotEmpty(t, keyInfo.Metadata())
+ assert.NotEmpty(t, keyInfo.Name())
+ assert.NotEmpty(t, keyInfo.Key())
+ assert.Equal(t, "value", keyInfo.metadata["key"])
+}
+
+func TestGetPrivateKey(t *testing.T) {
+ keyReader := NewFileKeyReader("", "../crypto/testdata/pri_key_rsa.pem")
+ keyInfo, err := keyReader.PrivateKey("test-key", map[string]string{"key": "value"})
+
+ assert.Nil(t, err)
+ assert.NotNil(t, keyInfo)
+ assert.NotEmpty(t, keyInfo.Metadata())
+ assert.NotEmpty(t, keyInfo.Name())
+ assert.NotEmpty(t, keyInfo.Key())
+ assert.Equal(t, "value", keyInfo.metadata["key"])
+}
+
+func TestInvalidKeyPath(t *testing.T) {
+ keyReader := NewFileKeyReader("../crypto/testdata/no_pub_key_rsa.pem", "../crypto/testdata/no_pri_key_rsa.pem")
+
+ // try to read public key
+ keyInfo, err := keyReader.PublicKey("test-pub-key", nil)
+ assert.Nil(t, keyInfo)
+ assert.NotNil(t, err)
+
+ // try to read private key
+ keyInfo, err = keyReader.PrivateKey("test-pri-key", nil)
+ assert.Nil(t, keyInfo)
+ assert.NotNil(t, err)
+}
diff --git a/pulsar/crypto/default_message_crypto.go b/pulsar/crypto/default_message_crypto.go
new file mode 100644
index 0000000..93a0244
--- /dev/null
+++ b/pulsar/crypto/default_message_crypto.go
@@ -0,0 +1,365 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+import (
+ gocrypto "crypto"
+ "crypto/aes"
+ "crypto/cipher"
+ "crypto/md5"
+ "crypto/rand"
+ "crypto/rsa"
+ "crypto/sha1"
+ "crypto/x509"
+ "encoding/pem"
+ "errors"
+ "fmt"
+ "sync"
+
+ "github.com/apache/pulsar-client-go/pulsar/log"
+)
+
+// DefaultMessageCrypto implmentation of the interface MessageCryto
+type DefaultMessageCrypto struct {
+ // data key which is used to encrypt/decrypt messages
+ dataKey []byte
+
+ // LoadingCache used by the consumer to cache already decrypted key
+ loadingCache sync.Map // map[string][]byte
+
+ encryptedDataKeyMap sync.Map // map[string]EncryptionKeyInfo
+
+ logCtx string
+
+ logger log.Logger
+
+ cipherLock sync.Mutex
+
+ encryptLock sync.Mutex
+}
+
+// NewDefaultMessageCrypto get the instance of message crypto
+func NewDefaultMessageCrypto(logCtx string, keyGenNeeded bool, logger log.Logger) (*DefaultMessageCrypto, error) {
+
+ d := &DefaultMessageCrypto{
+ logCtx: logCtx,
+ loadingCache: sync.Map{},
+ encryptedDataKeyMap: sync.Map{},
+ logger: logger,
+ }
+
+ if keyGenNeeded {
+ key, err := generateDataKey()
+ if err != nil {
+ return d, err
+ }
+ d.dataKey = key
+ }
+
+ return d, nil
+}
+
+// AddPublicKeyCipher encrypt data key using keyCrypto and cache
+func (d *DefaultMessageCrypto) AddPublicKeyCipher(keyNames []string, keyReader KeyReader) error {
+ key, err := generateDataKey()
+ if err != nil {
+ return err
+ }
+
+ d.dataKey = key
+ for _, keyName := range keyNames {
+ err := d.addPublicKeyCipher(keyName, keyReader)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (d *DefaultMessageCrypto) addPublicKeyCipher(keyName string, keyReader KeyReader) error {
+ d.cipherLock.Lock()
+ defer d.cipherLock.Unlock()
+ if keyName == "" || keyReader == nil {
+ return fmt.Errorf("keyname or keyreader is null")
+ }
+
+ // read the public key and its info using keyReader
+ keyInfo, err := keyReader.PublicKey(keyName, nil)
+ if err != nil {
+ return err
+ }
+
+ parsedKey, err := d.loadPublicKey(keyInfo.Key())
+ if err != nil {
+ return err
+ }
+
+ // try to cast to RSA key
+ rsaPubKey, ok := parsedKey.(*rsa.PublicKey)
+ if !ok {
+ return fmt.Errorf("only RSA keys are supported")
+ }
+
+ encryptedDataKey, err := rsa.EncryptOAEP(sha1.New(), rand.Reader, rsaPubKey, d.dataKey, nil)
+ if err != nil {
+ return err
+ }
+
+ d.encryptedDataKeyMap.Store(keyName, NewEncryptionKeyInfo(keyName, encryptedDataKey, keyInfo.Metadata()))
+
+ return nil
+}
+
+// RemoveKeyCipher remove encrypted data key from cache
+func (d *DefaultMessageCrypto) RemoveKeyCipher(keyName string) bool {
+ if keyName == "" {
+ return false
+ }
+ d.encryptedDataKeyMap.Delete(keyName)
+ return true
+}
+
+// Encrypt encrypt payload using encryption keys and add encrypted data key
+// to message metadata. Here data key is encrypted
+// using public key
+func (d *DefaultMessageCrypto) Encrypt(encKeys []string,
+ keyReader KeyReader,
+ msgMetadata MessageMetadataSupplier,
+ payload []byte) ([]byte, error) {
+ d.encryptLock.Lock()
+ defer d.encryptLock.Unlock()
+
+ if len(encKeys) == 0 {
+ return payload, nil
+ }
+
+ for _, keyName := range encKeys {
+ // if key is not already loaded, load it
+ if _, ok := d.encryptedDataKeyMap.Load(keyName); !ok {
+ if err := d.addPublicKeyCipher(keyName, keyReader); err != nil {
+ d.logger.Error(err)
+ }
+ }
+
+ // add key to the message metadata
+ if k, ok := d.encryptedDataKeyMap.Load(keyName); ok {
+ keyInfo, keyInfoOk := k.(*EncryptionKeyInfo)
+
+ if keyInfoOk {
+ msgMetadata.UpsertEncryptionkey(*keyInfo)
+ } else {
+ d.logger.Error("failed to get EncryptionKeyInfo for key %v", keyName)
+ }
+ } else {
+ // we should never reach here
+ msg := fmt.Sprintf("%v Failed to find encrypted Data key for key %v", d.logCtx, keyName)
+ d.logger.Errorf(msg)
+ return nil, fmt.Errorf(msg)
+ }
+
+ }
+
+ // generate a new AES cipher with data key
+ c, err := aes.NewCipher(d.dataKey)
+
+ if err != nil {
+ d.logger.Error("failed to create AES cipher")
+ return nil, err
+ }
+
+ // gcm
+ gcm, err := cipher.NewGCM(c)
+
+ if err != nil {
+ d.logger.Error("failed to create gcm")
+ return nil, err
+ }
+
+ // create gcm param
+ nonce := make([]byte, gcm.NonceSize())
+ _, err = rand.Read(nonce)
+
+ if err != nil {
+ d.logger.Error(err)
+ return nil, err
+ }
+
+ // Update message metadata with encryption param
+ msgMetadata.SetEncryptionParam(nonce)
+
+ // encrypt payload using seal function
+ return gcm.Seal(nil, nonce, payload, nil), nil
+}
+
+// Decrypt decrypt the payload using decrypted data key.
+// Here data key is read from from the message
+// metadata and decrypted using private key.
+func (d *DefaultMessageCrypto) Decrypt(msgMetadata MessageMetadataSupplier,
+ payload []byte,
+ keyReader KeyReader) ([]byte, error) {
+ // if data key is present, attempt to derypt using the existing key
+ if d.dataKey != nil {
+ decryptedData, err := d.getKeyAndDecryptData(msgMetadata, payload)
+ if err != nil {
+ d.logger.Error(err)
+ }
+
+ if decryptedData != nil {
+ return decryptedData, nil
+ }
+ }
+
+ // data key is null or decryption failed. Attempt to regenerate data key
+ encKeys := msgMetadata.EncryptionKeys()
+ var ecKeyInfo *EncryptionKeyInfo
+
+ for _, encKey := range encKeys {
+ if d.decryptDataKey(encKey.Name(), encKey.Key(), encKey.Metadata(), keyReader) {
+ ecKeyInfo = &encKey
+ }
+ }
+
+ if ecKeyInfo == nil || d.dataKey == nil {
+ // unable to decrypt data key
+ return nil, errors.New("unable to decrypt data key")
+ }
+
+ return d.getKeyAndDecryptData(msgMetadata, payload)
+}
+
+func (d *DefaultMessageCrypto) decryptData(dataKeySecret []byte,
+ msgMetadata MessageMetadataSupplier,
+ payload []byte) ([]byte, error) {
+ // get nonce from message metadata
+ nonce := msgMetadata.EncryptionParam()
+
+ c, err := aes.NewCipher(dataKeySecret)
+
+ if err != nil {
+ d.logger.Error(err)
+ return nil, err
+ }
+
+ gcm, err := cipher.NewGCM(c)
+ if err != nil {
+ return nil, err
+ }
+
+ decryptedData, err := gcm.Open(nil, nonce, payload, nil)
+
+ if err != nil {
+ d.logger.Error(err)
+ }
+
+ return decryptedData, err
+}
+
+func (d *DefaultMessageCrypto) getKeyAndDecryptData(msgMetadata MessageMetadataSupplier,
+ payload []byte) ([]byte, error) {
+ // go through all keys to retrieve data key from cache
+ for _, k := range msgMetadata.EncryptionKeys() {
+ msgDataKey := k.Key()
+ keyDigest := fmt.Sprintf("%x", md5.Sum(msgDataKey))
+ if storedSecretKey, ok := d.loadingCache.Load(keyDigest); ok {
+ decryptedData, err := d.decryptData(storedSecretKey.([]byte), msgMetadata, payload)
+ if err != nil {
+ d.logger.Error(err)
+ }
+
+ if decryptedData != nil {
+ return decryptedData, nil
+ }
+ } else {
+ // First time, entry wont be present in cache
+ d.logger.Debugf("%s Failed to decrypt data or data key is not in cache. Will attempt to refresh", d.logCtx)
+ }
+ }
+ return nil, nil
+}
+
+func (d *DefaultMessageCrypto) decryptDataKey(keyName string,
+ encDatakey []byte,
+ keyMeta map[string]string,
+ keyReader KeyReader) bool {
+
+ keyInfo, err := keyReader.PrivateKey(keyName, keyMeta)
+ if err != nil {
+ d.logger.Error(err)
+ return false
+ }
+
+ parsedKey, err := d.loadPrivateKey(keyInfo.Key())
+ if err != nil {
+ d.logger.Error(err)
+ return false
+ }
+
+ rsaPriKey, ok := parsedKey.(*rsa.PrivateKey)
+ if !ok {
+ d.logger.Error("only RSA keys are supported")
+ return false
+ }
+
+ decryptedDataKey, err := rsa.DecryptOAEP(sha1.New(), rand.Reader, rsaPriKey, encDatakey, nil)
+ if err != nil {
+ d.logger.Error(err)
+ return false
+ }
+ d.dataKey = decryptedDataKey
+ d.loadingCache.Store(fmt.Sprintf("%x", md5.Sum(encDatakey)), d.dataKey)
+
+ return true
+}
+
+func (d *DefaultMessageCrypto) loadPrivateKey(key []byte) (gocrypto.PrivateKey, error) {
+ var privateKey gocrypto.PrivateKey
+ priPem, _ := pem.Decode(key)
+ if priPem == nil {
+ return privateKey, fmt.Errorf("failed to decode private key")
+ }
+ genericPrivateKey, err := x509.ParsePKCS1PrivateKey(priPem.Bytes)
+ if err != nil {
+ return privateKey, err
+ }
+ privateKey = genericPrivateKey
+ return privateKey, nil
+}
+
+// read the public key into RSA key
+func (d *DefaultMessageCrypto) loadPublicKey(key []byte) (gocrypto.PublicKey, error) {
+ var publickKey gocrypto.PublicKey
+
+ pubPem, _ := pem.Decode(key)
+ if pubPem == nil {
+ return publickKey, fmt.Errorf("failed to decode public key")
+ }
+
+ genericPublicKey, err := x509.ParsePKIXPublicKey(pubPem.Bytes)
+ if err != nil {
+ return publickKey, err
+ }
+ publickKey = genericPublicKey
+
+ return publickKey, nil
+}
+
+func generateDataKey() ([]byte, error) {
+ key := make([]byte, 32) // generate key of length 256 bits
+ _, err := rand.Read(key) // cryptographically secure random number
+ return key, err
+}
diff --git a/pulsar/crypto/default_message_crypto_test.go b/pulsar/crypto/default_message_crypto_test.go
new file mode 100644
index 0000000..ae126e0
--- /dev/null
+++ b/pulsar/crypto/default_message_crypto_test.go
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+import (
+ "testing"
+
+ pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+ "github.com/apache/pulsar-client-go/pulsar/log"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestAddPublicKeyCipher(t *testing.T) {
+ msgCrypto, err := NewDefaultMessageCrypto("test-default-crypto", true, log.DefaultNopLogger())
+ assert.Nil(t, err)
+ assert.NotNil(t, msgCrypto)
+
+ // valid keyreader
+ err = msgCrypto.AddPublicKeyCipher(
+ []string{"my-app.key"},
+ NewFileKeyReader("../crypto/testdata/pub_key_rsa.pem", ""),
+ )
+ assert.Nil(t, err)
+
+ // invalid keyreader
+ err = msgCrypto.AddPublicKeyCipher(
+ []string{"my-app0.key"},
+ NewFileKeyReader("../crypto/testdata/no_pub_key_rsa.pem", ""),
+ )
+ assert.NotNil(t, err)
+
+ // empty keyreader
+ err = msgCrypto.AddPublicKeyCipher(
+ []string{"my-app1.key"},
+ nil,
+ )
+ assert.NotNil(t, err)
+
+ // keyreader with wrong econding of public key
+ err = msgCrypto.AddPublicKeyCipher(
+ []string{"my-app2.key"},
+ NewFileKeyReader("../crypto/testdata/wrong_encode_pub_key_rsa.pem", ""),
+ )
+ assert.NotNil(t, err)
+
+ // keyreader with truncated pub key
+ err = msgCrypto.AddPublicKeyCipher(
+ []string{"my-app2.key"},
+ NewFileKeyReader("../crypto/testdata/truncated_pub_key_rsa.pem", ""),
+ )
+ assert.NotNil(t, err)
+}
+
+func TestEncrypt(t *testing.T) {
+ msgMetadata := &pb.MessageMetadata{}
+ msgMetadataSupplier := NewMessageMetadataSupplier(msgMetadata)
+
+ msg := "my-message-01"
+
+ msgCrypto, err := NewDefaultMessageCrypto("my-app", true, log.DefaultNopLogger())
+ assert.Nil(t, err)
+ assert.NotNil(t, msgCrypto)
+
+ // valid keyreader
+ encryptedData, err := msgCrypto.Encrypt(
+ []string{"my-app.key"},
+ NewFileKeyReader("../crypto/testdata/pub_key_rsa.pem", ""),
+ msgMetadataSupplier,
+ []byte(msg),
+ )
+
+ assert.Nil(t, err)
+ assert.NotNil(t, encryptedData)
+
+ // encrypted data key and encryption param must set in
+ // in the message metadata after encryption
+ assert.NotNil(t, msgMetadataSupplier.EncryptionParam())
+ assert.NotEmpty(t, msgMetadataSupplier.EncryptionKeys())
+
+ // invalid keyreader
+ encryptedData, err = msgCrypto.Encrypt(
+ []string{"my-app2.key"},
+ NewFileKeyReader("../crypto/testdata/no_pub_key_rsa.pem", ""),
+ msgMetadataSupplier,
+ []byte(msg),
+ )
+
+ assert.NotNil(t, err)
+ assert.Nil(t, encryptedData)
+}
+
+func TestEncryptDecrypt(t *testing.T) {
+ msgMetadata := &pb.MessageMetadata{}
+ msgMetadataSupplier := NewMessageMetadataSupplier(msgMetadata)
+
+ msg := "my-message-01"
+
+ msgCrypto, err := NewDefaultMessageCrypto("my-app", true, log.DefaultNopLogger())
+ assert.Nil(t, err)
+ assert.NotNil(t, msgCrypto)
+
+ // valid keyreader
+ encryptedData, err := msgCrypto.Encrypt(
+ []string{"my-app.key"},
+ NewFileKeyReader("../crypto/testdata/pub_key_rsa.pem", ""),
+ msgMetadataSupplier,
+ []byte(msg),
+ )
+
+ assert.Nil(t, err)
+ assert.NotNil(t, encryptedData)
+
+ // encrypted data key and encryption param must set in
+ // in the message metadata after encryption
+ assert.NotNil(t, msgMetadataSupplier.EncryptionParam())
+ assert.NotEmpty(t, msgMetadataSupplier.EncryptionKeys())
+
+ // try to decrypt
+ msgCryptoDecrypt, err := NewDefaultMessageCrypto("my-app", true, log.DefaultNopLogger())
+ assert.Nil(t, err)
+ assert.NotNil(t, msgCrypto)
+
+ // keyreader with invalid private key
+ decryptedData, err := msgCryptoDecrypt.Decrypt(
+ msgMetadataSupplier,
+ encryptedData,
+ NewFileKeyReader("", "../crypto/testdata/no_pri_key_rsa.pem"),
+ )
+ assert.NotNil(t, err)
+ assert.Nil(t, decryptedData)
+
+ // keyreader with wrong encoded private key
+ decryptedData, err = msgCryptoDecrypt.Decrypt(
+ msgMetadataSupplier,
+ encryptedData,
+ NewFileKeyReader("", "../crypto/testdata/wrong_encoded_pri_key_rsa.pem"),
+ )
+ assert.NotNil(t, err)
+ assert.Nil(t, decryptedData)
+
+ // keyreader with valid private key
+ decryptedData, err = msgCryptoDecrypt.Decrypt(
+ msgMetadataSupplier,
+ encryptedData,
+ NewFileKeyReader("", "../crypto/testdata/pri_key_rsa.pem"),
+ )
+
+ assert.Nil(t, err)
+ assert.Equal(t, msg, string(decryptedData))
+}
diff --git a/pulsar/crypto/encryption_key_Info.go b/pulsar/crypto/encryption_key_Info.go
new file mode 100644
index 0000000..8418682
--- /dev/null
+++ b/pulsar/crypto/encryption_key_Info.go
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+// EncryptionKeyInfo
+type EncryptionKeyInfo struct {
+ metadata map[string]string
+ key []byte
+ name string
+}
+
+// NewEncryptionKeyInfo
+func NewEncryptionKeyInfo(name string, key []byte, metadata map[string]string) *EncryptionKeyInfo {
+ return &EncryptionKeyInfo{
+ metadata: metadata,
+ name: name,
+ key: key,
+ }
+}
+
+// GetKey get key
+func (eci *EncryptionKeyInfo) Name() string {
+ return eci.name
+}
+
+// GetValue get value
+func (eci *EncryptionKeyInfo) Key() []byte {
+ return eci.key
+}
+
+// GetMetadata get key metadata
+func (eci *EncryptionKeyInfo) Metadata() map[string]string {
+ return eci.metadata
+}
diff --git a/pulsar/crypto/message_crypto.go b/pulsar/crypto/message_crypto.go
new file mode 100644
index 0000000..4ad1eec
--- /dev/null
+++ b/pulsar/crypto/message_crypto.go
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+// MessageCrypto implement this interface to encrypt and decrypt messages
+type MessageCrypto interface {
+ // AddPublicKeyCipher
+ AddPublicKeyCipher(keyNames []string, keyReader KeyReader) error
+
+ // RemoveKeyCipher remove the key indentified by the keyname from the list
+ RemoveKeyCipher(keyName string) bool
+
+ // Encrypt the payload using the data key and update
+ // message metadata with the keyname and encrypted data key
+ Encrypt(encKeys []string, KeyReader KeyReader, msgMetadata MessageMetadataSupplier, payload []byte) ([]byte, error)
+
+ // Decrypt the payload using the data key.
+ // Keys used to ecnrypt the data key can be retrieved from msgMetadata
+ Decrypt(msgMetadata MessageMetadataSupplier, payload []byte, KeyReader KeyReader) ([]byte, error)
+}
diff --git a/pulsar/crypto/message_metadata.go b/pulsar/crypto/message_metadata.go
new file mode 100644
index 0000000..042a6a0
--- /dev/null
+++ b/pulsar/crypto/message_metadata.go
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+import (
+ pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+)
+
+// MessageMetadataSupplier wrapper implementation around message metadata
+type MessageMetadataSupplier interface {
+ // GetEncryptionKeys read all the encryption keys from the MessageMetadata
+ EncryptionKeys() []EncryptionKeyInfo
+
+ // UpsertEncryptionkey add new or update existing EncryptionKeys in to the MessageMetadata
+ UpsertEncryptionkey(EncryptionKeyInfo)
+
+ // GetEncryptionParam read the ecryption parameter from the MessageMetadata
+ EncryptionParam() []byte
+
+ // SetEncryptionParam set encryption parameter in to the MessageMetadata
+ SetEncryptionParam([]byte)
+}
+
+type MessageMetadata struct {
+ messageMetadata *pb.MessageMetadata
+}
+
+func NewMessageMetadataSupplier(messageMetadata *pb.MessageMetadata) MessageMetadataSupplier {
+ return &MessageMetadata{
+ messageMetadata: messageMetadata,
+ }
+}
+
+func (m *MessageMetadata) EncryptionKeys() []EncryptionKeyInfo {
+ if m.messageMetadata != nil {
+ encInfo := []EncryptionKeyInfo{}
+ for _, k := range m.messageMetadata.EncryptionKeys {
+ key := NewEncryptionKeyInfo(k.GetKey(), k.GetValue(), getKeyMetaMap(k.GetMetadata()))
+ encInfo = append(encInfo, *key)
+ }
+ return encInfo
+ }
+ return nil
+}
+
+func (m *MessageMetadata) UpsertEncryptionkey(keyInfo EncryptionKeyInfo) {
+ if m.messageMetadata != nil {
+ idx := m.encryptionKeyPresent(keyInfo)
+ newKey := &pb.EncryptionKeys{
+ Key: &keyInfo.name,
+ Value: keyInfo.Key(),
+ Metadata: getKeyMeta(keyInfo.Metadata()),
+ }
+
+ if idx >= 0 {
+ m.messageMetadata.EncryptionKeys[idx] = newKey
+ } else {
+ m.messageMetadata.EncryptionKeys = append(m.messageMetadata.EncryptionKeys, newKey)
+ }
+ }
+}
+
+func (m *MessageMetadata) EncryptionParam() []byte {
+ if m.messageMetadata != nil {
+ return m.messageMetadata.EncryptionParam
+ }
+ return nil
+}
+
+func (m *MessageMetadata) SetEncryptionParam(param []byte) {
+ if m.messageMetadata != nil {
+ m.messageMetadata.EncryptionParam = param
+ }
+}
+
+func (m *MessageMetadata) encryptionKeyPresent(keyInfo EncryptionKeyInfo) int {
+ if len(m.messageMetadata.EncryptionKeys) > 0 {
+ for idx, k := range m.messageMetadata.EncryptionKeys {
+ if k.GetKey() == keyInfo.Name() {
+ return idx
+ }
+ }
+ }
+ return -1
+}
+
+func getKeyMeta(metaMap map[string]string) []*pb.KeyValue {
+ if len(metaMap) > 0 {
+ meta := []*pb.KeyValue{}
+ for k, v := range metaMap {
+ meta = append(meta, &pb.KeyValue{Key: &k, Value: &v})
+ }
+ return meta
+ }
+ return nil
+}
+
+func getKeyMetaMap(keyValues []*pb.KeyValue) map[string]string {
+ if keyValues != nil {
+ meta := map[string]string{}
+ for _, kv := range keyValues {
+ meta[kv.GetKey()] = kv.GetValue()
+ }
+ return meta
+ }
+ return nil
+}
diff --git a/pulsar/crypto/message_metadata_test.go b/pulsar/crypto/message_metadata_test.go
new file mode 100644
index 0000000..3e8cb38
--- /dev/null
+++ b/pulsar/crypto/message_metadata_test.go
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+import (
+ "testing"
+
+ pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestGetEncyptionKeys(t *testing.T) {
+ msgMetadata := &pb.MessageMetadata{}
+ name1 := "key-1"
+ value1 := []byte{1, 2, 3, 4}
+
+ name2 := "key-2"
+ value2 := []byte{4, 3, 2, 1}
+
+ name3 := "key-3"
+ value3 := []byte{6, 7, 8, 9}
+
+ msgMetadata.EncryptionKeys = append(msgMetadata.EncryptionKeys, &pb.EncryptionKeys{
+ Key: &name1,
+ Value: value1,
+ Metadata: []*pb.KeyValue{
+ {Key: &name1, Value: &name1},
+ },
+ },
+ &pb.EncryptionKeys{
+ Key: &name2,
+ Value: value2,
+ Metadata: []*pb.KeyValue{
+ {Key: &name1, Value: &name1},
+ {Key: &name2, Value: &name2},
+ },
+ },
+ &pb.EncryptionKeys{
+ Key: &name3,
+ Value: value3,
+ },
+ )
+
+ expected := []EncryptionKeyInfo{
+ {
+ name: name1,
+ key: value1,
+ metadata: map[string]string{
+ "key-1": "key-1",
+ },
+ },
+ {
+ name: name2,
+ key: value2,
+ metadata: map[string]string{
+ "key-1": "key-1",
+ "key-2": "key-2",
+ },
+ },
+ {
+ name: name3,
+ key: value3,
+ },
+ }
+
+ msgMetadataSupplier := NewMessageMetadataSupplier(msgMetadata)
+ actual := msgMetadataSupplier.EncryptionKeys()
+
+ assert.EqualValues(t, expected, actual)
+}
+
+func TestUpsertEncryptionKey(t *testing.T) {
+ msgMetadata := &pb.MessageMetadata{}
+ key1 := "key-1"
+ value1 := []byte{1, 2, 3, 4}
+
+ keyInfo := NewEncryptionKeyInfo(key1, value1, map[string]string{"key-1": "value-1"})
+
+ expected := []EncryptionKeyInfo{*keyInfo}
+
+ msgMetadataSupplier := NewMessageMetadataSupplier(msgMetadata)
+
+ msgMetadataSupplier.UpsertEncryptionkey(*keyInfo)
+
+ // try to add same key again
+ msgMetadataSupplier.UpsertEncryptionkey(*keyInfo)
+
+ actual := msgMetadataSupplier.EncryptionKeys()
+
+ assert.EqualValues(t, expected, actual)
+}
+
+func TestEncryptionParam(t *testing.T) {
+ msgMetadata := &pb.MessageMetadata{}
+
+ expected := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9}
+
+ msgMetadataSupplier := NewMessageMetadataSupplier(msgMetadata)
+ msgMetadataSupplier.SetEncryptionParam(expected)
+
+ assert.EqualValues(t, expected, msgMetadataSupplier.EncryptionParam())
+}
diff --git a/pulsar/crypto/testdata/pri_key_rsa.pem b/pulsar/crypto/testdata/pri_key_rsa.pem
new file mode 100644
index 0000000..8b184ff
--- /dev/null
+++ b/pulsar/crypto/testdata/pri_key_rsa.pem
@@ -0,0 +1,39 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIG4wIBAAKCAYEA5YeHRDCRtEn8kVi7xVT2YicRByZZUPjzB9qSlQboHvdIpZhW
+hoRg+qPYP2RoH7JlmPX/q/RnNjFZ52pG9Tzl3J67Pz3H5j4FoKFZ7OTf55Wk2f83
+pe9hgDTfAfIuR1ch2MtfOx1yzoVUENFfLwiIhSg/+6wjY98SbZujM1a/iWpM9IFk
+pOgwZZrXtSPoUlNidwAZrYS+eGVL9hyyH9RT86gYh1BgsVH4zjQqCP76lxXrsSyn
+lsXfasenpTppVfvyR1NhK1JhQf3dsvCgie0iWcK+OICt0z9WesakKnyAU01BFOmX
+APuDS2wErTb2Zw654lo7iEFdRGK1Mmx0gViJ7bXBxKVIw24za/EsCeb+jLFXVilC
+/P0T7mI/y6FF0wPkTf8aICJphspdnimEJSXeQinjmx+iFWvG/uHimrswXaAQbP1Y
+OoJZx+HFy4b/4/hVLu3aiFM2sgfGkT8gsg3di2zxEIn7cnLaIC4HgcVxQLUVMsBX
+FXgU2SWVWPb2+fv9AgMBAAECggGAIohVbYjxIvLOsP9soK+7seC2yyCV53zM862K
+yCkV2zLRFzjoK6zW+l6UNlVg5QPuDSlVogVPUfPy1sJCkrrqylqHSWh+HsHODvC3
+mtCEb08wUiv3r1toi+Vod9573/fX+8n9NeOkVjXxA+a8L1NVVtNLbQ3k9S2mlB8K
+Wrr+bio0EcU14ymbvm2hbntMhLZO5iB5vIVwMqTq3MhMdEV5q/rqVdFd+s+eY0et
+21ShVwHRSAKz5Bc5GdOYAV/cYTdaXyR3RJUxy+JT0QW5qJ7FKGa7WIvh2epIdQtf
+N79aXQ6ZpQK8aghd6hintV8pJgxD2EcRaT2AAHmuLj1P74YO7WJJnQy/eXhdq2jl
+xmTqk9TgFQksvin8QtJyFQzFvxWUKN4z7NAnFlDCM7Yj/k8QnA85kWtyBpBuCFhr
+/fapz59Kg5apaAlWKFe9piTezQim8CNgPjdhDjyo7Yyy/rr6XI1ncmC+xKnWpyeH
+gXPxDT+7pC1TAU6DC8DRSizh5DGBAoHBAP3JFDSSwwNP5UP9P2EmslhcKR3Ks6rY
+XuUotK7yy5sXtkxW2+xGOaKyrCjw4WSXqx0FgGndpejG93bBLM+SwhT/OYZYAqju
+f2B9iOFIDshSumpCor19FBn4fnb933RkNS/O9k4iQye6He1L8hleBHMoqDJmFbk2
+vESwNLYrpxuQVfjMbWsLoamwRs2gqwYtd4LvSfwtwxLtTozZhbMufRaTb610VDlZ
+iALDTvWRpGjDLaSoz+agaSDalGHVmnfmhQKBwQDniEPMKAiQM6Nk1jsWfFfu0E/0
+0v55F99naAdJuaG0IkJNtD0WCbjtM25gk82hbcP7XP7Sz+ZaNu8Q16xOpOYvtgn5
+Yqd1/5Hh8oQf7lwRPFfpr/PeHjW1f4qCQokPgSut3E8nKKfQKhhRajJH78JBqfPW
+Q7A90W4zBC62sTDRqbAzkDq1OKlTu5HyuNGmPqnum8olIgsaOsoBIpsCqfeuFElq
+WXCpw8NKOaH8YQP1hd+PhVKrHcNW/EFhhjOqZRkCgcAPcXD9UgDz7qSw4nQ84THx
+FoqZ+X+9YbVElJmKG9Qv991r/80aL5vKPr0jMKVGjcQn2/HYf1hdNd5RJ6gmaXPN
++0nw1uIyjXDK2li9/LiJkB8v4CYvCbFzcx+e9gvm9UIXSqzKTGNxw22WxwxQZtw2
+db7mcjfYMXB7bY2HmFhu4PWaUjZGUUrhHIzyblh548JmAVGrOs2oFTC2eXYdVTLf
+cNFW6MFHTB9uq5vebaJnjZj1cCBWlGRRT3vACFOCAFECgcAlJdzS3b15/X8Cx8iV
+NAAbxfp+Kng/z4+9lJhOwOTr9O80bm26ona0QCM+hZhhhS4Dn4kXI9ousU+sIR55
+Q8XW89sn0ydRLF8opHOEeAb3kPn9+YgkJC6z3zHG8ovxG+V5MLbWbpR2NrrOHT7S
+AermBDGmOBgH4xlOQCaKk2VkzlgB/esddmjckWS6T+L7TGSRbxeA27RyUeplQjsi
+s0iU+pZI5O7JniowN40A5EPxWbhj251G7TCRPUn0LscNWMkCgcEA6BxNQ907FfQu
++srCpZcmthxKdm8cLrONT/U9cc5tJBftL/zlSvGnhU+rFEt/nLOYoroa9dhZlM8/
+i3BXZofMQ36Q76QIaN41fbzzwpDz5aIFlN/cvASk5Yspdv3Lq1dis0FzAcHFweWU
+4jw9m4f9nn81b3QiAjT5aB1Ftqeu9L9r4EZHqAjU+iBN/HpF6W7YPhKKX7VNkX7g
+uUZjERpZq5kg7DWWqhcwLc2ztiRrBmT964Q6CnRTzvYmdyaqyQni
+-----END RSA PRIVATE KEY-----
\ No newline at end of file
diff --git a/pulsar/crypto/testdata/pub_key_rsa.pem b/pulsar/crypto/testdata/pub_key_rsa.pem
new file mode 100644
index 0000000..8e49886
--- /dev/null
+++ b/pulsar/crypto/testdata/pub_key_rsa.pem
@@ -0,0 +1,11 @@
+-----BEGIN PUBLIC KEY-----
+MIIBojANBgkqhkiG9w0BAQEFAAOCAY8AMIIBigKCAYEA5YeHRDCRtEn8kVi7xVT2
+YicRByZZUPjzB9qSlQboHvdIpZhWhoRg+qPYP2RoH7JlmPX/q/RnNjFZ52pG9Tzl
+3J67Pz3H5j4FoKFZ7OTf55Wk2f83pe9hgDTfAfIuR1ch2MtfOx1yzoVUENFfLwiI
+hSg/+6wjY98SbZujM1a/iWpM9IFkpOgwZZrXtSPoUlNidwAZrYS+eGVL9hyyH9RT
+86gYh1BgsVH4zjQqCP76lxXrsSynlsXfasenpTppVfvyR1NhK1JhQf3dsvCgie0i
+WcK+OICt0z9WesakKnyAU01BFOmXAPuDS2wErTb2Zw654lo7iEFdRGK1Mmx0gViJ
+7bXBxKVIw24za/EsCeb+jLFXVilC/P0T7mI/y6FF0wPkTf8aICJphspdnimEJSXe
+Qinjmx+iFWvG/uHimrswXaAQbP1YOoJZx+HFy4b/4/hVLu3aiFM2sgfGkT8gsg3d
+i2zxEIn7cnLaIC4HgcVxQLUVMsBXFXgU2SWVWPb2+fv9AgMBAAE=
+-----END PUBLIC KEY-----
\ No newline at end of file
diff --git a/pulsar/crypto/testdata/truncated_pub_key_rsa.pem b/pulsar/crypto/testdata/truncated_pub_key_rsa.pem
new file mode 100644
index 0000000..51a0381
--- /dev/null
+++ b/pulsar/crypto/testdata/truncated_pub_key_rsa.pem
@@ -0,0 +1,10 @@
+-----BEGIN PUBLIC KEY-----
+MIIBojANBgkqhkiG9w0BAQEFAAOCAY8AMIIBigKCAYEA5YeHRDCRtEn8kVi7xVT2
+YicRByZZUPjzB9qSlQboHvdIpZhWhoRg+qPYP2RoH7JlmPX/q/RnNjFZ52pG9Tzl
+3J67Pz3H5j4FoKFZ7OTf55Wk2f83pe9hgDTfAfIuR1ch2MtfOx1yzoVUENFfLwiI
+hSg/+6wjY98SbZujM1a/iWpM9IFkpOgwZZrXtSPoUlNidwAZrYS+eGVL9hyyH9RT
+86gYh1BgsVH4zjQqCP76lxXrsSynlsXfasenpTppVfvyR1NhK1JhQf3dsvCgie0i
+WcK+OICt0z9WesakKnyAU01BFOmXAPuDS2wErTb2Zw654lo7iEFdRGK1Mmx0gViJ
+7bXBxKVIw24za/EsCeb+jLFXVilC/P0T7mI/y6FF0wPkTf8aICJphspdnimEJSXe
+Qinjmx+iFWvG/uHimrswXaAQbP1YOoJZx+HFy4b/4/hVLu3aiFM2sgfGkT8gsg3d
+-----END PUBLIC KEY-----
\ No newline at end of file
diff --git a/pulsar/crypto/testdata/wrong_encode_pub_key_rsa.pem b/pulsar/crypto/testdata/wrong_encode_pub_key_rsa.pem
new file mode 100644
index 0000000..3d40cc2
--- /dev/null
+++ b/pulsar/crypto/testdata/wrong_encode_pub_key_rsa.pem
@@ -0,0 +1,9 @@
+-----BEGIN PUBLIC KEY-----
+MIIBojANBgkqhkiG9w0BAQEFAAOCAY8AMIIBigKCAYEA5YeHRDCRtEn8kVi7xVT2
+YicRByZZUPjzB9qSlQboHvdIpZhWhoRg+qPYP2RoH7JlmPX/q/RnNjFZ52pG9Tzl
+3J67Pz3H5j4FoKFZ7OTf55Wk2f83pe9hgDTfAfIuR1ch2MtfOx1yzoVUENFfLwiI
+hSg/+6wjY98SbZujM1a/iWpM9IFkpOgwZZrXtSPoUlNidwAZrYS+eGVL9hyyH9RT
+86gYh1BgsVH4zjQqCP76lxXrsSynlsXfasenpTppVfvyR1NhK1JhQf3dsvCgie0i
+WcK+OICt0z9WesakKnyAU01BFOmXAPuDS2wErTb2Zw654lo7iEFdRGK1Mmx0gViJ
+7bXBxKVIw24za/EsCeb+jLFXVilC/P0T7mI/y6FF0wPkTf8aICJphspdnimEJSXe
+Qinjmx+iFWvG/uHimrswXaAQbP1YOoJZx+HFy4b/4/hVLu3aiFM2sgfGkT8gsg3d
\ No newline at end of file
diff --git a/pulsar/crypto/testdata/wrong_encoded_pri_key_rsa.pem b/pulsar/crypto/testdata/wrong_encoded_pri_key_rsa.pem
new file mode 100644
index 0000000..2566ae3
--- /dev/null
+++ b/pulsar/crypto/testdata/wrong_encoded_pri_key_rsa.pem
@@ -0,0 +1,38 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIG4wIBAAKCAYEA5YeHRDCRtEn8kVi7xVT2YicRByZZUPjzB9qSlQboHvdIpZhW
+hoRg+qPYP2RoH7JlmPX/q/RnNjFZ52pG9Tzl3J67Pz3H5j4FoKFZ7OTf55Wk2f83
+pe9hgDTfAfIuR1ch2MtfOx1yzoVUENFfLwiIhSg/+6wjY98SbZujM1a/iWpM9IFk
+pOgwZZrXtSPoUlNidwAZrYS+eGVL9hyyH9RT86gYh1BgsVH4zjQqCP76lxXrsSyn
+lsXfasenpTppVfvyR1NhK1JhQf3dsvCgie0iWcK+OICt0z9WesakKnyAU01BFOmX
+APuDS2wErTb2Zw654lo7iEFdRGK1Mmx0gViJ7bXBxKVIw24za/EsCeb+jLFXVilC
+/P0T7mI/y6FF0wPkTf8aICJphspdnimEJSXeQinjmx+iFWvG/uHimrswXaAQbP1Y
+OoJZx+HFy4b/4/hVLu3aiFM2sgfGkT8gsg3di2zxEIn7cnLaIC4HgcVxQLUVMsBX
+FXgU2SWVWPb2+fv9AgMBAAECggGAIohVbYjxIvLOsP9soK+7seC2yyCV53zM862K
+yCkV2zLRFzjoK6zW+l6UNlVg5QPuDSlVogVPUfPy1sJCkrrqylqHSWh+HsHODvC3
+mtCEb08wUiv3r1toi+Vod9573/fX+8n9NeOkVjXxA+a8L1NVVtNLbQ3k9S2mlB8K
+Wrr+bio0EcU14ymbvm2hbntMhLZO5iB5vIVwMqTq3MhMdEV5q/rqVdFd+s+eY0et
+21ShVwHRSAKz5Bc5GdOYAV/cYTdaXyR3RJUxy+JT0QW5qJ7FKGa7WIvh2epIdQtf
+N79aXQ6ZpQK8aghd6hintV8pJgxD2EcRaT2AAHmuLj1P74YO7WJJnQy/eXhdq2jl
+xmTqk9TgFQksvin8QtJyFQzFvxWUKN4z7NAnFlDCM7Yj/k8QnA85kWtyBpBuCFhr
+/fapz59Kg5apaAlWKFe9piTezQim8CNgPjdhDjyo7Yyy/rr6XI1ncmC+xKnWpyeH
+gXPxDT+7pC1TAU6DC8DRSizh5DGBAoHBAP3JFDSSwwNP5UP9P2EmslhcKR3Ks6rY
+XuUotK7yy5sXtkxW2+xGOaKyrCjw4WSXqx0FgGndpejG93bBLM+SwhT/OYZYAqju
+f2B9iOFIDshSumpCor19FBn4fnb933RkNS/O9k4iQye6He1L8hleBHMoqDJmFbk2
+vESwNLYrpxuQVfjMbWsLoamwRs2gqwYtd4LvSfwtwxLtTozZhbMufRaTb610VDlZ
+iALDTvWRpGjDLaSoz+agaSDalGHVmnfmhQKBwQDniEPMKAiQM6Nk1jsWfFfu0E/0
+0v55F99naAdJuaG0IkJNtD0WCbjtM25gk82hbcP7XP7Sz+ZaNu8Q16xOpOYvtgn5
+Yqd1/5Hh8oQf7lwRPFfpr/PeHjW1f4qCQokPgSut3E8nKKfQKhhRajJH78JBqfPW
+Q7A90W4zBC62sTDRqbAzkDq1OKlTu5HyuNGmPqnum8olIgsaOsoBIpsCqfeuFElq
+WXCpw8NKOaH8YQP1hd+PhVKrHcNW/EFhhjOqZRkCgcAPcXD9UgDz7qSw4nQ84THx
+FoqZ+X+9YbVElJmKG9Qv991r/80aL5vKPr0jMKVGjcQn2/HYf1hdNd5RJ6gmaXPN
++0nw1uIyjXDK2li9/LiJkB8v4CYvCbFzcx+e9gvm9UIXSqzKTGNxw22WxwxQZtw2
+db7mcjfYMXB7bY2HmFhu4PWaUjZGUUrhHIzyblh548JmAVGrOs2oFTC2eXYdVTLf
+cNFW6MFHTB9uq5vebaJnjZj1cCBWlGRRT3vACFOCAFECgcAlJdzS3b15/X8Cx8iV
+NAAbxfp+Kng/z4+9lJhOwOTr9O80bm26ona0QCM+hZhhhS4Dn4kXI9ousU+sIR55
+Q8XW89sn0ydRLF8opHOEeAb3kPn9+YgkJC6z3zHG8ovxG+V5MLbWbpR2NrrOHT7S
+AermBDGmOBgH4xlOQCaKk2VkzlgB/esddmjckWS6T+L7TGSRbxeA27RyUeplQjsi
+s0iU+pZI5O7JniowN40A5EPxWbhj251G7TCRPUn0LscNWMkCgcEA6BxNQ907FfQu
++srCpZcmthxKdm8cLrONT/U9cc5tJBftL/zlSvGnhU+rFEt/nLOYoroa9dhZlM8/
+i3BXZofMQ36Q76QIaN41fbzzwpDz5aIFlN/cvASk5Yspdv3Lq1dis0FzAcHFweWU
+4jw9m4f9nn81b3QiAjT5aB1Ftqeu9L9r4EZHqAjU+iBN/HpF6W7YPhKKX7VNkX7g
+uUZjERpZq5kg7DWWqhcwLc2ztiRrBmT964Q6CnRTzvYmdyaqyQni
\ No newline at end of file