add base package for encryption (#555)

Breakdown of PR  [#552 ](https://github.com/apache/pulsar-client-go/pull/552)

This PR includes base interface and default implementation.
diff --git a/go.sum b/go.sum
index c56f5b7..b2818ee 100644
--- a/go.sum
+++ b/go.sum
@@ -165,6 +165,7 @@
 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
 github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
+github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
 github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
 github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
 github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
diff --git a/pulsar/crypto/crypto_failure_action.go b/pulsar/crypto/crypto_failure_action.go
new file mode 100644
index 0000000..891d740
--- /dev/null
+++ b/pulsar/crypto/crypto_failure_action.go
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+const (
+	// ProducerCryptoFailureActionFail this is the default option to fail send if crypto operation fails.
+	ProducerCryptoFailureActionFail = iota
+
+	// ProducerCryptoFailureActionSend ingnore crypto failure and proceed with sending unencrypted message.
+	ProducerCryptoFailureActionSend
+)
+
+const (
+	// ConsumerCryptoFailureActionFail this is the default option to fail consume messages until crypto succeeds.
+	ConsumerCryptoFailureActionFail = iota
+
+	// ConsumerCryptoFailureActionDiscard  message is silently acknowledged and not delivered to the application
+	ConsumerCryptoFailureActionDiscard
+
+	// ConsumerCryptoFailureActionConsume deliver the encrypted message to the application.
+	// It's the application's responsibility to decrypt the message.
+	// if message is also compressed, decompression will fail.
+	// If message contain batch messages, client will not be able to retrieve
+	// individual messages in the batch.
+	// delivered encrypted message contains EncryptionContext which contains encryption
+	// and compression information in it using which application can decrypt the payload.
+	ConsumerCryptoFailureActionConsume
+)
diff --git a/pulsar/crypto/crypto_key_reader.go b/pulsar/crypto/crypto_key_reader.go
new file mode 100644
index 0000000..51295c1
--- /dev/null
+++ b/pulsar/crypto/crypto_key_reader.go
@@ -0,0 +1,28 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+// KeyReader implement this interface to read and provide public & private keys
+// key pair can be RSA, ECDSA
+type KeyReader interface {
+	// PublicKey get public key that is be used by the producer to encrypt data key
+	PublicKey(keyName string, metadata map[string]string) (*EncryptionKeyInfo, error)
+
+	// PrivateKey get private key that is used by the consumer to decrypt data key
+	PrivateKey(keyName string, metadata map[string]string) (*EncryptionKeyInfo, error)
+}
diff --git a/pulsar/crypto/default_crypto_Key_reader.go b/pulsar/crypto/default_crypto_Key_reader.go
new file mode 100644
index 0000000..6378d4e
--- /dev/null
+++ b/pulsar/crypto/default_crypto_Key_reader.go
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+import "io/ioutil"
+
+// FileKeyReader default implementation of KeyReader
+type FileKeyReader struct {
+	publicKeyPath  string
+	privateKeyPath string
+}
+
+func NewFileKeyReader(publicKeyPath, privateKeyPath string) *FileKeyReader {
+	return &FileKeyReader{
+		publicKeyPath:  publicKeyPath,
+		privateKeyPath: privateKeyPath,
+	}
+}
+
+// PublicKey read public key from the given path
+func (d *FileKeyReader) PublicKey(keyName string, keyMeta map[string]string) (*EncryptionKeyInfo, error) {
+	return readKey(keyName, d.publicKeyPath, keyMeta)
+}
+
+// PrivateKey read private key from the given path
+func (d *FileKeyReader) PrivateKey(keyName string, keyMeta map[string]string) (*EncryptionKeyInfo, error) {
+	return readKey(keyName, d.privateKeyPath, keyMeta)
+}
+
+func readKey(keyName, path string, keyMeta map[string]string) (*EncryptionKeyInfo, error) {
+	key, err := ioutil.ReadFile(path)
+	if err != nil {
+		return nil, err
+	}
+	return NewEncryptionKeyInfo(keyName, key, keyMeta), nil
+}
diff --git a/pulsar/crypto/default_crypto_key_reader_test.go b/pulsar/crypto/default_crypto_key_reader_test.go
new file mode 100644
index 0000000..776d08f
--- /dev/null
+++ b/pulsar/crypto/default_crypto_key_reader_test.go
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+import (
+	"testing"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestGetPublicKey(t *testing.T) {
+	keyReader := NewFileKeyReader("../crypto/testdata/pub_key_rsa.pem", "")
+	keyInfo, err := keyReader.PublicKey("test-key", map[string]string{"key": "value"})
+
+	assert.Nil(t, err)
+	assert.NotNil(t, keyInfo)
+	assert.NotEmpty(t, keyInfo.Metadata())
+	assert.NotEmpty(t, keyInfo.Name())
+	assert.NotEmpty(t, keyInfo.Key())
+	assert.Equal(t, "value", keyInfo.metadata["key"])
+}
+
+func TestGetPrivateKey(t *testing.T) {
+	keyReader := NewFileKeyReader("", "../crypto/testdata/pri_key_rsa.pem")
+	keyInfo, err := keyReader.PrivateKey("test-key", map[string]string{"key": "value"})
+
+	assert.Nil(t, err)
+	assert.NotNil(t, keyInfo)
+	assert.NotEmpty(t, keyInfo.Metadata())
+	assert.NotEmpty(t, keyInfo.Name())
+	assert.NotEmpty(t, keyInfo.Key())
+	assert.Equal(t, "value", keyInfo.metadata["key"])
+}
+
+func TestInvalidKeyPath(t *testing.T) {
+	keyReader := NewFileKeyReader("../crypto/testdata/no_pub_key_rsa.pem", "../crypto/testdata/no_pri_key_rsa.pem")
+
+	// try to read public key
+	keyInfo, err := keyReader.PublicKey("test-pub-key", nil)
+	assert.Nil(t, keyInfo)
+	assert.NotNil(t, err)
+
+	// try to read private key
+	keyInfo, err = keyReader.PrivateKey("test-pri-key", nil)
+	assert.Nil(t, keyInfo)
+	assert.NotNil(t, err)
+}
diff --git a/pulsar/crypto/default_message_crypto.go b/pulsar/crypto/default_message_crypto.go
new file mode 100644
index 0000000..93a0244
--- /dev/null
+++ b/pulsar/crypto/default_message_crypto.go
@@ -0,0 +1,365 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+import (
+	gocrypto "crypto"
+	"crypto/aes"
+	"crypto/cipher"
+	"crypto/md5"
+	"crypto/rand"
+	"crypto/rsa"
+	"crypto/sha1"
+	"crypto/x509"
+	"encoding/pem"
+	"errors"
+	"fmt"
+	"sync"
+
+	"github.com/apache/pulsar-client-go/pulsar/log"
+)
+
+// DefaultMessageCrypto implmentation of the interface MessageCryto
+type DefaultMessageCrypto struct {
+	// data key which is used to encrypt/decrypt messages
+	dataKey []byte
+
+	// LoadingCache used by the consumer to cache already decrypted key
+	loadingCache sync.Map // map[string][]byte
+
+	encryptedDataKeyMap sync.Map // map[string]EncryptionKeyInfo
+
+	logCtx string
+
+	logger log.Logger
+
+	cipherLock sync.Mutex
+
+	encryptLock sync.Mutex
+}
+
+// NewDefaultMessageCrypto get the instance of message crypto
+func NewDefaultMessageCrypto(logCtx string, keyGenNeeded bool, logger log.Logger) (*DefaultMessageCrypto, error) {
+
+	d := &DefaultMessageCrypto{
+		logCtx:              logCtx,
+		loadingCache:        sync.Map{},
+		encryptedDataKeyMap: sync.Map{},
+		logger:              logger,
+	}
+
+	if keyGenNeeded {
+		key, err := generateDataKey()
+		if err != nil {
+			return d, err
+		}
+		d.dataKey = key
+	}
+
+	return d, nil
+}
+
+// AddPublicKeyCipher encrypt data key using keyCrypto and cache
+func (d *DefaultMessageCrypto) AddPublicKeyCipher(keyNames []string, keyReader KeyReader) error {
+	key, err := generateDataKey()
+	if err != nil {
+		return err
+	}
+
+	d.dataKey = key
+	for _, keyName := range keyNames {
+		err := d.addPublicKeyCipher(keyName, keyReader)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func (d *DefaultMessageCrypto) addPublicKeyCipher(keyName string, keyReader KeyReader) error {
+	d.cipherLock.Lock()
+	defer d.cipherLock.Unlock()
+	if keyName == "" || keyReader == nil {
+		return fmt.Errorf("keyname or keyreader is null")
+	}
+
+	// read the public key and its info using keyReader
+	keyInfo, err := keyReader.PublicKey(keyName, nil)
+	if err != nil {
+		return err
+	}
+
+	parsedKey, err := d.loadPublicKey(keyInfo.Key())
+	if err != nil {
+		return err
+	}
+
+	// try to cast to RSA key
+	rsaPubKey, ok := parsedKey.(*rsa.PublicKey)
+	if !ok {
+		return fmt.Errorf("only RSA keys are supported")
+	}
+
+	encryptedDataKey, err := rsa.EncryptOAEP(sha1.New(), rand.Reader, rsaPubKey, d.dataKey, nil)
+	if err != nil {
+		return err
+	}
+
+	d.encryptedDataKeyMap.Store(keyName, NewEncryptionKeyInfo(keyName, encryptedDataKey, keyInfo.Metadata()))
+
+	return nil
+}
+
+// RemoveKeyCipher remove encrypted data key from cache
+func (d *DefaultMessageCrypto) RemoveKeyCipher(keyName string) bool {
+	if keyName == "" {
+		return false
+	}
+	d.encryptedDataKeyMap.Delete(keyName)
+	return true
+}
+
+// Encrypt encrypt payload using encryption keys and add encrypted data key
+// to message metadata. Here data key is encrypted
+// using public key
+func (d *DefaultMessageCrypto) Encrypt(encKeys []string,
+	keyReader KeyReader,
+	msgMetadata MessageMetadataSupplier,
+	payload []byte) ([]byte, error) {
+	d.encryptLock.Lock()
+	defer d.encryptLock.Unlock()
+
+	if len(encKeys) == 0 {
+		return payload, nil
+	}
+
+	for _, keyName := range encKeys {
+		// if key is not already loaded, load it
+		if _, ok := d.encryptedDataKeyMap.Load(keyName); !ok {
+			if err := d.addPublicKeyCipher(keyName, keyReader); err != nil {
+				d.logger.Error(err)
+			}
+		}
+
+		// add key to the message metadata
+		if k, ok := d.encryptedDataKeyMap.Load(keyName); ok {
+			keyInfo, keyInfoOk := k.(*EncryptionKeyInfo)
+
+			if keyInfoOk {
+				msgMetadata.UpsertEncryptionkey(*keyInfo)
+			} else {
+				d.logger.Error("failed to get EncryptionKeyInfo for key %v", keyName)
+			}
+		} else {
+			// we should never reach here
+			msg := fmt.Sprintf("%v Failed to find encrypted Data key for key %v", d.logCtx, keyName)
+			d.logger.Errorf(msg)
+			return nil, fmt.Errorf(msg)
+		}
+
+	}
+
+	// generate a new AES cipher with data key
+	c, err := aes.NewCipher(d.dataKey)
+
+	if err != nil {
+		d.logger.Error("failed to create AES cipher")
+		return nil, err
+	}
+
+	// gcm
+	gcm, err := cipher.NewGCM(c)
+
+	if err != nil {
+		d.logger.Error("failed to create gcm")
+		return nil, err
+	}
+
+	// create gcm param
+	nonce := make([]byte, gcm.NonceSize())
+	_, err = rand.Read(nonce)
+
+	if err != nil {
+		d.logger.Error(err)
+		return nil, err
+	}
+
+	// Update message metadata with encryption param
+	msgMetadata.SetEncryptionParam(nonce)
+
+	// encrypt payload using seal function
+	return gcm.Seal(nil, nonce, payload, nil), nil
+}
+
+// Decrypt decrypt the payload using decrypted data key.
+// Here data key is read from from the message
+// metadata and  decrypted using private key.
+func (d *DefaultMessageCrypto) Decrypt(msgMetadata MessageMetadataSupplier,
+	payload []byte,
+	keyReader KeyReader) ([]byte, error) {
+	// if data key is present, attempt to derypt using the existing key
+	if d.dataKey != nil {
+		decryptedData, err := d.getKeyAndDecryptData(msgMetadata, payload)
+		if err != nil {
+			d.logger.Error(err)
+		}
+
+		if decryptedData != nil {
+			return decryptedData, nil
+		}
+	}
+
+	// data key is null or decryption failed. Attempt to regenerate data key
+	encKeys := msgMetadata.EncryptionKeys()
+	var ecKeyInfo *EncryptionKeyInfo
+
+	for _, encKey := range encKeys {
+		if d.decryptDataKey(encKey.Name(), encKey.Key(), encKey.Metadata(), keyReader) {
+			ecKeyInfo = &encKey
+		}
+	}
+
+	if ecKeyInfo == nil || d.dataKey == nil {
+		// unable to decrypt data key
+		return nil, errors.New("unable to decrypt data key")
+	}
+
+	return d.getKeyAndDecryptData(msgMetadata, payload)
+}
+
+func (d *DefaultMessageCrypto) decryptData(dataKeySecret []byte,
+	msgMetadata MessageMetadataSupplier,
+	payload []byte) ([]byte, error) {
+	// get nonce from message metadata
+	nonce := msgMetadata.EncryptionParam()
+
+	c, err := aes.NewCipher(dataKeySecret)
+
+	if err != nil {
+		d.logger.Error(err)
+		return nil, err
+	}
+
+	gcm, err := cipher.NewGCM(c)
+	if err != nil {
+		return nil, err
+	}
+
+	decryptedData, err := gcm.Open(nil, nonce, payload, nil)
+
+	if err != nil {
+		d.logger.Error(err)
+	}
+
+	return decryptedData, err
+}
+
+func (d *DefaultMessageCrypto) getKeyAndDecryptData(msgMetadata MessageMetadataSupplier,
+	payload []byte) ([]byte, error) {
+	// go through all keys to retrieve data key from cache
+	for _, k := range msgMetadata.EncryptionKeys() {
+		msgDataKey := k.Key()
+		keyDigest := fmt.Sprintf("%x", md5.Sum(msgDataKey))
+		if storedSecretKey, ok := d.loadingCache.Load(keyDigest); ok {
+			decryptedData, err := d.decryptData(storedSecretKey.([]byte), msgMetadata, payload)
+			if err != nil {
+				d.logger.Error(err)
+			}
+
+			if decryptedData != nil {
+				return decryptedData, nil
+			}
+		} else {
+			// First time, entry wont be present in cache
+			d.logger.Debugf("%s Failed to decrypt data or data key is not in cache. Will attempt to refresh", d.logCtx)
+		}
+	}
+	return nil, nil
+}
+
+func (d *DefaultMessageCrypto) decryptDataKey(keyName string,
+	encDatakey []byte,
+	keyMeta map[string]string,
+	keyReader KeyReader) bool {
+
+	keyInfo, err := keyReader.PrivateKey(keyName, keyMeta)
+	if err != nil {
+		d.logger.Error(err)
+		return false
+	}
+
+	parsedKey, err := d.loadPrivateKey(keyInfo.Key())
+	if err != nil {
+		d.logger.Error(err)
+		return false
+	}
+
+	rsaPriKey, ok := parsedKey.(*rsa.PrivateKey)
+	if !ok {
+		d.logger.Error("only RSA keys are supported")
+		return false
+	}
+
+	decryptedDataKey, err := rsa.DecryptOAEP(sha1.New(), rand.Reader, rsaPriKey, encDatakey, nil)
+	if err != nil {
+		d.logger.Error(err)
+		return false
+	}
+	d.dataKey = decryptedDataKey
+	d.loadingCache.Store(fmt.Sprintf("%x", md5.Sum(encDatakey)), d.dataKey)
+
+	return true
+}
+
+func (d *DefaultMessageCrypto) loadPrivateKey(key []byte) (gocrypto.PrivateKey, error) {
+	var privateKey gocrypto.PrivateKey
+	priPem, _ := pem.Decode(key)
+	if priPem == nil {
+		return privateKey, fmt.Errorf("failed to decode private key")
+	}
+	genericPrivateKey, err := x509.ParsePKCS1PrivateKey(priPem.Bytes)
+	if err != nil {
+		return privateKey, err
+	}
+	privateKey = genericPrivateKey
+	return privateKey, nil
+}
+
+// read the public key into RSA key
+func (d *DefaultMessageCrypto) loadPublicKey(key []byte) (gocrypto.PublicKey, error) {
+	var publickKey gocrypto.PublicKey
+
+	pubPem, _ := pem.Decode(key)
+	if pubPem == nil {
+		return publickKey, fmt.Errorf("failed to decode public key")
+	}
+
+	genericPublicKey, err := x509.ParsePKIXPublicKey(pubPem.Bytes)
+	if err != nil {
+		return publickKey, err
+	}
+	publickKey = genericPublicKey
+
+	return publickKey, nil
+}
+
+func generateDataKey() ([]byte, error) {
+	key := make([]byte, 32)  // generate key of length 256 bits
+	_, err := rand.Read(key) // cryptographically secure random number
+	return key, err
+}
diff --git a/pulsar/crypto/default_message_crypto_test.go b/pulsar/crypto/default_message_crypto_test.go
new file mode 100644
index 0000000..ae126e0
--- /dev/null
+++ b/pulsar/crypto/default_message_crypto_test.go
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+import (
+	"testing"
+
+	pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+	"github.com/apache/pulsar-client-go/pulsar/log"
+	"github.com/stretchr/testify/assert"
+)
+
+func TestAddPublicKeyCipher(t *testing.T) {
+	msgCrypto, err := NewDefaultMessageCrypto("test-default-crypto", true, log.DefaultNopLogger())
+	assert.Nil(t, err)
+	assert.NotNil(t, msgCrypto)
+
+	// valid keyreader
+	err = msgCrypto.AddPublicKeyCipher(
+		[]string{"my-app.key"},
+		NewFileKeyReader("../crypto/testdata/pub_key_rsa.pem", ""),
+	)
+	assert.Nil(t, err)
+
+	// invalid keyreader
+	err = msgCrypto.AddPublicKeyCipher(
+		[]string{"my-app0.key"},
+		NewFileKeyReader("../crypto/testdata/no_pub_key_rsa.pem", ""),
+	)
+	assert.NotNil(t, err)
+
+	// empty keyreader
+	err = msgCrypto.AddPublicKeyCipher(
+		[]string{"my-app1.key"},
+		nil,
+	)
+	assert.NotNil(t, err)
+
+	// keyreader with wrong econding of public key
+	err = msgCrypto.AddPublicKeyCipher(
+		[]string{"my-app2.key"},
+		NewFileKeyReader("../crypto/testdata/wrong_encode_pub_key_rsa.pem", ""),
+	)
+	assert.NotNil(t, err)
+
+	// keyreader with truncated pub key
+	err = msgCrypto.AddPublicKeyCipher(
+		[]string{"my-app2.key"},
+		NewFileKeyReader("../crypto/testdata/truncated_pub_key_rsa.pem", ""),
+	)
+	assert.NotNil(t, err)
+}
+
+func TestEncrypt(t *testing.T) {
+	msgMetadata := &pb.MessageMetadata{}
+	msgMetadataSupplier := NewMessageMetadataSupplier(msgMetadata)
+
+	msg := "my-message-01"
+
+	msgCrypto, err := NewDefaultMessageCrypto("my-app", true, log.DefaultNopLogger())
+	assert.Nil(t, err)
+	assert.NotNil(t, msgCrypto)
+
+	// valid keyreader
+	encryptedData, err := msgCrypto.Encrypt(
+		[]string{"my-app.key"},
+		NewFileKeyReader("../crypto/testdata/pub_key_rsa.pem", ""),
+		msgMetadataSupplier,
+		[]byte(msg),
+	)
+
+	assert.Nil(t, err)
+	assert.NotNil(t, encryptedData)
+
+	// encrypted data key and encryption param must set in
+	// in the message metadata after encryption
+	assert.NotNil(t, msgMetadataSupplier.EncryptionParam())
+	assert.NotEmpty(t, msgMetadataSupplier.EncryptionKeys())
+
+	// invalid keyreader
+	encryptedData, err = msgCrypto.Encrypt(
+		[]string{"my-app2.key"},
+		NewFileKeyReader("../crypto/testdata/no_pub_key_rsa.pem", ""),
+		msgMetadataSupplier,
+		[]byte(msg),
+	)
+
+	assert.NotNil(t, err)
+	assert.Nil(t, encryptedData)
+}
+
+func TestEncryptDecrypt(t *testing.T) {
+	msgMetadata := &pb.MessageMetadata{}
+	msgMetadataSupplier := NewMessageMetadataSupplier(msgMetadata)
+
+	msg := "my-message-01"
+
+	msgCrypto, err := NewDefaultMessageCrypto("my-app", true, log.DefaultNopLogger())
+	assert.Nil(t, err)
+	assert.NotNil(t, msgCrypto)
+
+	// valid keyreader
+	encryptedData, err := msgCrypto.Encrypt(
+		[]string{"my-app.key"},
+		NewFileKeyReader("../crypto/testdata/pub_key_rsa.pem", ""),
+		msgMetadataSupplier,
+		[]byte(msg),
+	)
+
+	assert.Nil(t, err)
+	assert.NotNil(t, encryptedData)
+
+	// encrypted data key and encryption param must set in
+	// in the message metadata after encryption
+	assert.NotNil(t, msgMetadataSupplier.EncryptionParam())
+	assert.NotEmpty(t, msgMetadataSupplier.EncryptionKeys())
+
+	// try to decrypt
+	msgCryptoDecrypt, err := NewDefaultMessageCrypto("my-app", true, log.DefaultNopLogger())
+	assert.Nil(t, err)
+	assert.NotNil(t, msgCrypto)
+
+	// keyreader with invalid private key
+	decryptedData, err := msgCryptoDecrypt.Decrypt(
+		msgMetadataSupplier,
+		encryptedData,
+		NewFileKeyReader("", "../crypto/testdata/no_pri_key_rsa.pem"),
+	)
+	assert.NotNil(t, err)
+	assert.Nil(t, decryptedData)
+
+	// keyreader with wrong encoded private key
+	decryptedData, err = msgCryptoDecrypt.Decrypt(
+		msgMetadataSupplier,
+		encryptedData,
+		NewFileKeyReader("", "../crypto/testdata/wrong_encoded_pri_key_rsa.pem"),
+	)
+	assert.NotNil(t, err)
+	assert.Nil(t, decryptedData)
+
+	// keyreader with valid private key
+	decryptedData, err = msgCryptoDecrypt.Decrypt(
+		msgMetadataSupplier,
+		encryptedData,
+		NewFileKeyReader("", "../crypto/testdata/pri_key_rsa.pem"),
+	)
+
+	assert.Nil(t, err)
+	assert.Equal(t, msg, string(decryptedData))
+}
diff --git a/pulsar/crypto/encryption_key_Info.go b/pulsar/crypto/encryption_key_Info.go
new file mode 100644
index 0000000..8418682
--- /dev/null
+++ b/pulsar/crypto/encryption_key_Info.go
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+// EncryptionKeyInfo
+type EncryptionKeyInfo struct {
+	metadata map[string]string
+	key      []byte
+	name     string
+}
+
+// NewEncryptionKeyInfo
+func NewEncryptionKeyInfo(name string, key []byte, metadata map[string]string) *EncryptionKeyInfo {
+	return &EncryptionKeyInfo{
+		metadata: metadata,
+		name:     name,
+		key:      key,
+	}
+}
+
+// GetKey get key
+func (eci *EncryptionKeyInfo) Name() string {
+	return eci.name
+}
+
+// GetValue get value
+func (eci *EncryptionKeyInfo) Key() []byte {
+	return eci.key
+}
+
+// GetMetadata get key metadata
+func (eci *EncryptionKeyInfo) Metadata() map[string]string {
+	return eci.metadata
+}
diff --git a/pulsar/crypto/message_crypto.go b/pulsar/crypto/message_crypto.go
new file mode 100644
index 0000000..4ad1eec
--- /dev/null
+++ b/pulsar/crypto/message_crypto.go
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+// MessageCrypto implement this interface to encrypt and decrypt messages
+type MessageCrypto interface {
+	// AddPublicKeyCipher
+	AddPublicKeyCipher(keyNames []string, keyReader KeyReader) error
+
+	// RemoveKeyCipher remove the key indentified by the keyname from the list
+	RemoveKeyCipher(keyName string) bool
+
+	// Encrypt the payload using the data key and update
+	// message metadata with the keyname and encrypted data key
+	Encrypt(encKeys []string, KeyReader KeyReader, msgMetadata MessageMetadataSupplier, payload []byte) ([]byte, error)
+
+	// Decrypt the payload using the data key.
+	// Keys used to ecnrypt the data key can be retrieved from msgMetadata
+	Decrypt(msgMetadata MessageMetadataSupplier, payload []byte, KeyReader KeyReader) ([]byte, error)
+}
diff --git a/pulsar/crypto/message_metadata.go b/pulsar/crypto/message_metadata.go
new file mode 100644
index 0000000..042a6a0
--- /dev/null
+++ b/pulsar/crypto/message_metadata.go
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+import (
+	pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+)
+
+// MessageMetadataSupplier wrapper implementation around message metadata
+type MessageMetadataSupplier interface {
+	// GetEncryptionKeys read all the encryption keys from the MessageMetadata
+	EncryptionKeys() []EncryptionKeyInfo
+
+	// UpsertEncryptionkey add new or update existing EncryptionKeys in to the MessageMetadata
+	UpsertEncryptionkey(EncryptionKeyInfo)
+
+	// GetEncryptionParam read the ecryption parameter from the MessageMetadata
+	EncryptionParam() []byte
+
+	// SetEncryptionParam set encryption parameter in to the MessageMetadata
+	SetEncryptionParam([]byte)
+}
+
+type MessageMetadata struct {
+	messageMetadata *pb.MessageMetadata
+}
+
+func NewMessageMetadataSupplier(messageMetadata *pb.MessageMetadata) MessageMetadataSupplier {
+	return &MessageMetadata{
+		messageMetadata: messageMetadata,
+	}
+}
+
+func (m *MessageMetadata) EncryptionKeys() []EncryptionKeyInfo {
+	if m.messageMetadata != nil {
+		encInfo := []EncryptionKeyInfo{}
+		for _, k := range m.messageMetadata.EncryptionKeys {
+			key := NewEncryptionKeyInfo(k.GetKey(), k.GetValue(), getKeyMetaMap(k.GetMetadata()))
+			encInfo = append(encInfo, *key)
+		}
+		return encInfo
+	}
+	return nil
+}
+
+func (m *MessageMetadata) UpsertEncryptionkey(keyInfo EncryptionKeyInfo) {
+	if m.messageMetadata != nil {
+		idx := m.encryptionKeyPresent(keyInfo)
+		newKey := &pb.EncryptionKeys{
+			Key:      &keyInfo.name,
+			Value:    keyInfo.Key(),
+			Metadata: getKeyMeta(keyInfo.Metadata()),
+		}
+
+		if idx >= 0 {
+			m.messageMetadata.EncryptionKeys[idx] = newKey
+		} else {
+			m.messageMetadata.EncryptionKeys = append(m.messageMetadata.EncryptionKeys, newKey)
+		}
+	}
+}
+
+func (m *MessageMetadata) EncryptionParam() []byte {
+	if m.messageMetadata != nil {
+		return m.messageMetadata.EncryptionParam
+	}
+	return nil
+}
+
+func (m *MessageMetadata) SetEncryptionParam(param []byte) {
+	if m.messageMetadata != nil {
+		m.messageMetadata.EncryptionParam = param
+	}
+}
+
+func (m *MessageMetadata) encryptionKeyPresent(keyInfo EncryptionKeyInfo) int {
+	if len(m.messageMetadata.EncryptionKeys) > 0 {
+		for idx, k := range m.messageMetadata.EncryptionKeys {
+			if k.GetKey() == keyInfo.Name() {
+				return idx
+			}
+		}
+	}
+	return -1
+}
+
+func getKeyMeta(metaMap map[string]string) []*pb.KeyValue {
+	if len(metaMap) > 0 {
+		meta := []*pb.KeyValue{}
+		for k, v := range metaMap {
+			meta = append(meta, &pb.KeyValue{Key: &k, Value: &v})
+		}
+		return meta
+	}
+	return nil
+}
+
+func getKeyMetaMap(keyValues []*pb.KeyValue) map[string]string {
+	if keyValues != nil {
+		meta := map[string]string{}
+		for _, kv := range keyValues {
+			meta[kv.GetKey()] = kv.GetValue()
+		}
+		return meta
+	}
+	return nil
+}
diff --git a/pulsar/crypto/message_metadata_test.go b/pulsar/crypto/message_metadata_test.go
new file mode 100644
index 0000000..3e8cb38
--- /dev/null
+++ b/pulsar/crypto/message_metadata_test.go
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+import (
+	"testing"
+
+	pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+	"github.com/stretchr/testify/assert"
+)
+
+func TestGetEncyptionKeys(t *testing.T) {
+	msgMetadata := &pb.MessageMetadata{}
+	name1 := "key-1"
+	value1 := []byte{1, 2, 3, 4}
+
+	name2 := "key-2"
+	value2 := []byte{4, 3, 2, 1}
+
+	name3 := "key-3"
+	value3 := []byte{6, 7, 8, 9}
+
+	msgMetadata.EncryptionKeys = append(msgMetadata.EncryptionKeys, &pb.EncryptionKeys{
+		Key:   &name1,
+		Value: value1,
+		Metadata: []*pb.KeyValue{
+			{Key: &name1, Value: &name1},
+		},
+	},
+		&pb.EncryptionKeys{
+			Key:   &name2,
+			Value: value2,
+			Metadata: []*pb.KeyValue{
+				{Key: &name1, Value: &name1},
+				{Key: &name2, Value: &name2},
+			},
+		},
+		&pb.EncryptionKeys{
+			Key:   &name3,
+			Value: value3,
+		},
+	)
+
+	expected := []EncryptionKeyInfo{
+		{
+			name: name1,
+			key:  value1,
+			metadata: map[string]string{
+				"key-1": "key-1",
+			},
+		},
+		{
+			name: name2,
+			key:  value2,
+			metadata: map[string]string{
+				"key-1": "key-1",
+				"key-2": "key-2",
+			},
+		},
+		{
+			name: name3,
+			key:  value3,
+		},
+	}
+
+	msgMetadataSupplier := NewMessageMetadataSupplier(msgMetadata)
+	actual := msgMetadataSupplier.EncryptionKeys()
+
+	assert.EqualValues(t, expected, actual)
+}
+
+func TestUpsertEncryptionKey(t *testing.T) {
+	msgMetadata := &pb.MessageMetadata{}
+	key1 := "key-1"
+	value1 := []byte{1, 2, 3, 4}
+
+	keyInfo := NewEncryptionKeyInfo(key1, value1, map[string]string{"key-1": "value-1"})
+
+	expected := []EncryptionKeyInfo{*keyInfo}
+
+	msgMetadataSupplier := NewMessageMetadataSupplier(msgMetadata)
+
+	msgMetadataSupplier.UpsertEncryptionkey(*keyInfo)
+
+	// try to add same key again
+	msgMetadataSupplier.UpsertEncryptionkey(*keyInfo)
+
+	actual := msgMetadataSupplier.EncryptionKeys()
+
+	assert.EqualValues(t, expected, actual)
+}
+
+func TestEncryptionParam(t *testing.T) {
+	msgMetadata := &pb.MessageMetadata{}
+
+	expected := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9}
+
+	msgMetadataSupplier := NewMessageMetadataSupplier(msgMetadata)
+	msgMetadataSupplier.SetEncryptionParam(expected)
+
+	assert.EqualValues(t, expected, msgMetadataSupplier.EncryptionParam())
+}
diff --git a/pulsar/crypto/testdata/pri_key_rsa.pem b/pulsar/crypto/testdata/pri_key_rsa.pem
new file mode 100644
index 0000000..8b184ff
--- /dev/null
+++ b/pulsar/crypto/testdata/pri_key_rsa.pem
@@ -0,0 +1,39 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIG4wIBAAKCAYEA5YeHRDCRtEn8kVi7xVT2YicRByZZUPjzB9qSlQboHvdIpZhW
+hoRg+qPYP2RoH7JlmPX/q/RnNjFZ52pG9Tzl3J67Pz3H5j4FoKFZ7OTf55Wk2f83
+pe9hgDTfAfIuR1ch2MtfOx1yzoVUENFfLwiIhSg/+6wjY98SbZujM1a/iWpM9IFk
+pOgwZZrXtSPoUlNidwAZrYS+eGVL9hyyH9RT86gYh1BgsVH4zjQqCP76lxXrsSyn
+lsXfasenpTppVfvyR1NhK1JhQf3dsvCgie0iWcK+OICt0z9WesakKnyAU01BFOmX
+APuDS2wErTb2Zw654lo7iEFdRGK1Mmx0gViJ7bXBxKVIw24za/EsCeb+jLFXVilC
+/P0T7mI/y6FF0wPkTf8aICJphspdnimEJSXeQinjmx+iFWvG/uHimrswXaAQbP1Y
+OoJZx+HFy4b/4/hVLu3aiFM2sgfGkT8gsg3di2zxEIn7cnLaIC4HgcVxQLUVMsBX
+FXgU2SWVWPb2+fv9AgMBAAECggGAIohVbYjxIvLOsP9soK+7seC2yyCV53zM862K
+yCkV2zLRFzjoK6zW+l6UNlVg5QPuDSlVogVPUfPy1sJCkrrqylqHSWh+HsHODvC3
+mtCEb08wUiv3r1toi+Vod9573/fX+8n9NeOkVjXxA+a8L1NVVtNLbQ3k9S2mlB8K
+Wrr+bio0EcU14ymbvm2hbntMhLZO5iB5vIVwMqTq3MhMdEV5q/rqVdFd+s+eY0et
+21ShVwHRSAKz5Bc5GdOYAV/cYTdaXyR3RJUxy+JT0QW5qJ7FKGa7WIvh2epIdQtf
+N79aXQ6ZpQK8aghd6hintV8pJgxD2EcRaT2AAHmuLj1P74YO7WJJnQy/eXhdq2jl
+xmTqk9TgFQksvin8QtJyFQzFvxWUKN4z7NAnFlDCM7Yj/k8QnA85kWtyBpBuCFhr
+/fapz59Kg5apaAlWKFe9piTezQim8CNgPjdhDjyo7Yyy/rr6XI1ncmC+xKnWpyeH
+gXPxDT+7pC1TAU6DC8DRSizh5DGBAoHBAP3JFDSSwwNP5UP9P2EmslhcKR3Ks6rY
+XuUotK7yy5sXtkxW2+xGOaKyrCjw4WSXqx0FgGndpejG93bBLM+SwhT/OYZYAqju
+f2B9iOFIDshSumpCor19FBn4fnb933RkNS/O9k4iQye6He1L8hleBHMoqDJmFbk2
+vESwNLYrpxuQVfjMbWsLoamwRs2gqwYtd4LvSfwtwxLtTozZhbMufRaTb610VDlZ
+iALDTvWRpGjDLaSoz+agaSDalGHVmnfmhQKBwQDniEPMKAiQM6Nk1jsWfFfu0E/0
+0v55F99naAdJuaG0IkJNtD0WCbjtM25gk82hbcP7XP7Sz+ZaNu8Q16xOpOYvtgn5
+Yqd1/5Hh8oQf7lwRPFfpr/PeHjW1f4qCQokPgSut3E8nKKfQKhhRajJH78JBqfPW
+Q7A90W4zBC62sTDRqbAzkDq1OKlTu5HyuNGmPqnum8olIgsaOsoBIpsCqfeuFElq
+WXCpw8NKOaH8YQP1hd+PhVKrHcNW/EFhhjOqZRkCgcAPcXD9UgDz7qSw4nQ84THx
+FoqZ+X+9YbVElJmKG9Qv991r/80aL5vKPr0jMKVGjcQn2/HYf1hdNd5RJ6gmaXPN
++0nw1uIyjXDK2li9/LiJkB8v4CYvCbFzcx+e9gvm9UIXSqzKTGNxw22WxwxQZtw2
+db7mcjfYMXB7bY2HmFhu4PWaUjZGUUrhHIzyblh548JmAVGrOs2oFTC2eXYdVTLf
+cNFW6MFHTB9uq5vebaJnjZj1cCBWlGRRT3vACFOCAFECgcAlJdzS3b15/X8Cx8iV
+NAAbxfp+Kng/z4+9lJhOwOTr9O80bm26ona0QCM+hZhhhS4Dn4kXI9ousU+sIR55
+Q8XW89sn0ydRLF8opHOEeAb3kPn9+YgkJC6z3zHG8ovxG+V5MLbWbpR2NrrOHT7S
+AermBDGmOBgH4xlOQCaKk2VkzlgB/esddmjckWS6T+L7TGSRbxeA27RyUeplQjsi
+s0iU+pZI5O7JniowN40A5EPxWbhj251G7TCRPUn0LscNWMkCgcEA6BxNQ907FfQu
++srCpZcmthxKdm8cLrONT/U9cc5tJBftL/zlSvGnhU+rFEt/nLOYoroa9dhZlM8/
+i3BXZofMQ36Q76QIaN41fbzzwpDz5aIFlN/cvASk5Yspdv3Lq1dis0FzAcHFweWU
+4jw9m4f9nn81b3QiAjT5aB1Ftqeu9L9r4EZHqAjU+iBN/HpF6W7YPhKKX7VNkX7g
+uUZjERpZq5kg7DWWqhcwLc2ztiRrBmT964Q6CnRTzvYmdyaqyQni
+-----END RSA PRIVATE KEY-----
\ No newline at end of file
diff --git a/pulsar/crypto/testdata/pub_key_rsa.pem b/pulsar/crypto/testdata/pub_key_rsa.pem
new file mode 100644
index 0000000..8e49886
--- /dev/null
+++ b/pulsar/crypto/testdata/pub_key_rsa.pem
@@ -0,0 +1,11 @@
+-----BEGIN PUBLIC KEY-----
+MIIBojANBgkqhkiG9w0BAQEFAAOCAY8AMIIBigKCAYEA5YeHRDCRtEn8kVi7xVT2
+YicRByZZUPjzB9qSlQboHvdIpZhWhoRg+qPYP2RoH7JlmPX/q/RnNjFZ52pG9Tzl
+3J67Pz3H5j4FoKFZ7OTf55Wk2f83pe9hgDTfAfIuR1ch2MtfOx1yzoVUENFfLwiI
+hSg/+6wjY98SbZujM1a/iWpM9IFkpOgwZZrXtSPoUlNidwAZrYS+eGVL9hyyH9RT
+86gYh1BgsVH4zjQqCP76lxXrsSynlsXfasenpTppVfvyR1NhK1JhQf3dsvCgie0i
+WcK+OICt0z9WesakKnyAU01BFOmXAPuDS2wErTb2Zw654lo7iEFdRGK1Mmx0gViJ
+7bXBxKVIw24za/EsCeb+jLFXVilC/P0T7mI/y6FF0wPkTf8aICJphspdnimEJSXe
+Qinjmx+iFWvG/uHimrswXaAQbP1YOoJZx+HFy4b/4/hVLu3aiFM2sgfGkT8gsg3d
+i2zxEIn7cnLaIC4HgcVxQLUVMsBXFXgU2SWVWPb2+fv9AgMBAAE=
+-----END PUBLIC KEY-----
\ No newline at end of file
diff --git a/pulsar/crypto/testdata/truncated_pub_key_rsa.pem b/pulsar/crypto/testdata/truncated_pub_key_rsa.pem
new file mode 100644
index 0000000..51a0381
--- /dev/null
+++ b/pulsar/crypto/testdata/truncated_pub_key_rsa.pem
@@ -0,0 +1,10 @@
+-----BEGIN PUBLIC KEY-----
+MIIBojANBgkqhkiG9w0BAQEFAAOCAY8AMIIBigKCAYEA5YeHRDCRtEn8kVi7xVT2
+YicRByZZUPjzB9qSlQboHvdIpZhWhoRg+qPYP2RoH7JlmPX/q/RnNjFZ52pG9Tzl
+3J67Pz3H5j4FoKFZ7OTf55Wk2f83pe9hgDTfAfIuR1ch2MtfOx1yzoVUENFfLwiI
+hSg/+6wjY98SbZujM1a/iWpM9IFkpOgwZZrXtSPoUlNidwAZrYS+eGVL9hyyH9RT
+86gYh1BgsVH4zjQqCP76lxXrsSynlsXfasenpTppVfvyR1NhK1JhQf3dsvCgie0i
+WcK+OICt0z9WesakKnyAU01BFOmXAPuDS2wErTb2Zw654lo7iEFdRGK1Mmx0gViJ
+7bXBxKVIw24za/EsCeb+jLFXVilC/P0T7mI/y6FF0wPkTf8aICJphspdnimEJSXe
+Qinjmx+iFWvG/uHimrswXaAQbP1YOoJZx+HFy4b/4/hVLu3aiFM2sgfGkT8gsg3d
+-----END PUBLIC KEY-----
\ No newline at end of file
diff --git a/pulsar/crypto/testdata/wrong_encode_pub_key_rsa.pem b/pulsar/crypto/testdata/wrong_encode_pub_key_rsa.pem
new file mode 100644
index 0000000..3d40cc2
--- /dev/null
+++ b/pulsar/crypto/testdata/wrong_encode_pub_key_rsa.pem
@@ -0,0 +1,9 @@
+-----BEGIN PUBLIC KEY-----
+MIIBojANBgkqhkiG9w0BAQEFAAOCAY8AMIIBigKCAYEA5YeHRDCRtEn8kVi7xVT2
+YicRByZZUPjzB9qSlQboHvdIpZhWhoRg+qPYP2RoH7JlmPX/q/RnNjFZ52pG9Tzl
+3J67Pz3H5j4FoKFZ7OTf55Wk2f83pe9hgDTfAfIuR1ch2MtfOx1yzoVUENFfLwiI
+hSg/+6wjY98SbZujM1a/iWpM9IFkpOgwZZrXtSPoUlNidwAZrYS+eGVL9hyyH9RT
+86gYh1BgsVH4zjQqCP76lxXrsSynlsXfasenpTppVfvyR1NhK1JhQf3dsvCgie0i
+WcK+OICt0z9WesakKnyAU01BFOmXAPuDS2wErTb2Zw654lo7iEFdRGK1Mmx0gViJ
+7bXBxKVIw24za/EsCeb+jLFXVilC/P0T7mI/y6FF0wPkTf8aICJphspdnimEJSXe
+Qinjmx+iFWvG/uHimrswXaAQbP1YOoJZx+HFy4b/4/hVLu3aiFM2sgfGkT8gsg3d
\ No newline at end of file
diff --git a/pulsar/crypto/testdata/wrong_encoded_pri_key_rsa.pem b/pulsar/crypto/testdata/wrong_encoded_pri_key_rsa.pem
new file mode 100644
index 0000000..2566ae3
--- /dev/null
+++ b/pulsar/crypto/testdata/wrong_encoded_pri_key_rsa.pem
@@ -0,0 +1,38 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIG4wIBAAKCAYEA5YeHRDCRtEn8kVi7xVT2YicRByZZUPjzB9qSlQboHvdIpZhW
+hoRg+qPYP2RoH7JlmPX/q/RnNjFZ52pG9Tzl3J67Pz3H5j4FoKFZ7OTf55Wk2f83
+pe9hgDTfAfIuR1ch2MtfOx1yzoVUENFfLwiIhSg/+6wjY98SbZujM1a/iWpM9IFk
+pOgwZZrXtSPoUlNidwAZrYS+eGVL9hyyH9RT86gYh1BgsVH4zjQqCP76lxXrsSyn
+lsXfasenpTppVfvyR1NhK1JhQf3dsvCgie0iWcK+OICt0z9WesakKnyAU01BFOmX
+APuDS2wErTb2Zw654lo7iEFdRGK1Mmx0gViJ7bXBxKVIw24za/EsCeb+jLFXVilC
+/P0T7mI/y6FF0wPkTf8aICJphspdnimEJSXeQinjmx+iFWvG/uHimrswXaAQbP1Y
+OoJZx+HFy4b/4/hVLu3aiFM2sgfGkT8gsg3di2zxEIn7cnLaIC4HgcVxQLUVMsBX
+FXgU2SWVWPb2+fv9AgMBAAECggGAIohVbYjxIvLOsP9soK+7seC2yyCV53zM862K
+yCkV2zLRFzjoK6zW+l6UNlVg5QPuDSlVogVPUfPy1sJCkrrqylqHSWh+HsHODvC3
+mtCEb08wUiv3r1toi+Vod9573/fX+8n9NeOkVjXxA+a8L1NVVtNLbQ3k9S2mlB8K
+Wrr+bio0EcU14ymbvm2hbntMhLZO5iB5vIVwMqTq3MhMdEV5q/rqVdFd+s+eY0et
+21ShVwHRSAKz5Bc5GdOYAV/cYTdaXyR3RJUxy+JT0QW5qJ7FKGa7WIvh2epIdQtf
+N79aXQ6ZpQK8aghd6hintV8pJgxD2EcRaT2AAHmuLj1P74YO7WJJnQy/eXhdq2jl
+xmTqk9TgFQksvin8QtJyFQzFvxWUKN4z7NAnFlDCM7Yj/k8QnA85kWtyBpBuCFhr
+/fapz59Kg5apaAlWKFe9piTezQim8CNgPjdhDjyo7Yyy/rr6XI1ncmC+xKnWpyeH
+gXPxDT+7pC1TAU6DC8DRSizh5DGBAoHBAP3JFDSSwwNP5UP9P2EmslhcKR3Ks6rY
+XuUotK7yy5sXtkxW2+xGOaKyrCjw4WSXqx0FgGndpejG93bBLM+SwhT/OYZYAqju
+f2B9iOFIDshSumpCor19FBn4fnb933RkNS/O9k4iQye6He1L8hleBHMoqDJmFbk2
+vESwNLYrpxuQVfjMbWsLoamwRs2gqwYtd4LvSfwtwxLtTozZhbMufRaTb610VDlZ
+iALDTvWRpGjDLaSoz+agaSDalGHVmnfmhQKBwQDniEPMKAiQM6Nk1jsWfFfu0E/0
+0v55F99naAdJuaG0IkJNtD0WCbjtM25gk82hbcP7XP7Sz+ZaNu8Q16xOpOYvtgn5
+Yqd1/5Hh8oQf7lwRPFfpr/PeHjW1f4qCQokPgSut3E8nKKfQKhhRajJH78JBqfPW
+Q7A90W4zBC62sTDRqbAzkDq1OKlTu5HyuNGmPqnum8olIgsaOsoBIpsCqfeuFElq
+WXCpw8NKOaH8YQP1hd+PhVKrHcNW/EFhhjOqZRkCgcAPcXD9UgDz7qSw4nQ84THx
+FoqZ+X+9YbVElJmKG9Qv991r/80aL5vKPr0jMKVGjcQn2/HYf1hdNd5RJ6gmaXPN
++0nw1uIyjXDK2li9/LiJkB8v4CYvCbFzcx+e9gvm9UIXSqzKTGNxw22WxwxQZtw2
+db7mcjfYMXB7bY2HmFhu4PWaUjZGUUrhHIzyblh548JmAVGrOs2oFTC2eXYdVTLf
+cNFW6MFHTB9uq5vebaJnjZj1cCBWlGRRT3vACFOCAFECgcAlJdzS3b15/X8Cx8iV
+NAAbxfp+Kng/z4+9lJhOwOTr9O80bm26ona0QCM+hZhhhS4Dn4kXI9ousU+sIR55
+Q8XW89sn0ydRLF8opHOEeAb3kPn9+YgkJC6z3zHG8ovxG+V5MLbWbpR2NrrOHT7S
+AermBDGmOBgH4xlOQCaKk2VkzlgB/esddmjckWS6T+L7TGSRbxeA27RyUeplQjsi
+s0iU+pZI5O7JniowN40A5EPxWbhj251G7TCRPUn0LscNWMkCgcEA6BxNQ907FfQu
++srCpZcmthxKdm8cLrONT/U9cc5tJBftL/zlSvGnhU+rFEt/nLOYoroa9dhZlM8/
+i3BXZofMQ36Q76QIaN41fbzzwpDz5aIFlN/cvASk5Yspdv3Lq1dis0FzAcHFweWU
+4jw9m4f9nn81b3QiAjT5aB1Ftqeu9L9r4EZHqAjU+iBN/HpF6W7YPhKKX7VNkX7g
+uUZjERpZq5kg7DWWqhcwLc2ztiRrBmT964Q6CnRTzvYmdyaqyQni
\ No newline at end of file