blob: 920e3fde45c45babe9983e8b42e1bf9b23f48c79 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
#include "MessageCrypto.h"
#include <boost/date_time/posix_time/posix_time.hpp>
#include "LogUtils.h"
#include "PulsarApi.pb.h"
namespace pulsar {
MessageCrypto::MessageCrypto(const std::string& logCtx, bool keyGenNeeded)
: dataKeyLen_(32),
dataKey_(new unsigned char[dataKeyLen_]),
iv_(new unsigned char[ivLen_]),
logCtx_(logCtx) {
if (!keyGenNeeded) {
mdCtx_ = EVP_MD_CTX_create();
RAND_bytes(dataKey_.get(), dataKeyLen_);
RAND_bytes(iv_.get(), ivLen_);
MessageCrypto::~MessageCrypto() {}
RSA* MessageCrypto::loadPublicKey(std::string& pubKeyStr) {
BIO* pubBio = NULL;
RSA* rsaPub = NULL;
pubBio = BIO_new_mem_buf((char*)pubKeyStr.c_str(), -1);
if (pubBio == NULL) {
LOG_ERROR(logCtx_ << " Failed to get memory for public key");
return rsaPub;
rsaPub = PEM_read_bio_RSA_PUBKEY(pubBio, NULL, NULL, NULL);
if (rsaPub == NULL) {
LOG_ERROR(logCtx_ << " Failed to load public key");
return rsaPub;
RSA* MessageCrypto::loadPrivateKey(std::string& privateKeyStr) {
BIO* privBio = NULL;
RSA* rsaPriv = NULL;
privBio = BIO_new_mem_buf((char*)privateKeyStr.c_str(), -1);
if (privBio == NULL) {
LOG_ERROR(logCtx_ << " Failed to get memory for private key");
return rsaPriv;
rsaPriv = PEM_read_bio_RSAPrivateKey(privBio, NULL, NULL, NULL);
if (rsaPriv == NULL) {
LOG_ERROR(logCtx_ << " Failed to load private key");
return rsaPriv;
bool MessageCrypto::getDigest(const std::string& keyName, const void* input, unsigned int inputLen,
unsigned char keyDigest[], unsigned int& digestLen) {
if (EVP_DigestInit_ex(mdCtx_, EVP_md5(), NULL) != 1) {
LOG_ERROR(logCtx_ << "Failed to initialize md5 digest for key " << keyName);
return false;
digestLen = 0;
if (EVP_DigestUpdate(mdCtx_, input, inputLen) != 1) {
LOG_ERROR(logCtx_ << "Failed to get md5 hash for data key " << keyName);
return false;
if (EVP_DigestFinal_ex(mdCtx_, keyDigest, &digestLen) != 1) {
LOG_ERROR(logCtx_ << "Failed to finalize md hash for data key " << keyName);
return false;
return true;
void MessageCrypto::removeExpiredDataKey() {
boost::posix_time::ptime now = boost::posix_time::second_clock::universal_time();
boost::posix_time::time_duration expireTime = boost::posix_time::hours(4);
auto dataKeyCacheIter = dataKeyCache_.begin();
while (dataKeyCacheIter != dataKeyCache_.end()) {
const auto dataKeyEntry = dataKeyCacheIter->second;
if ((now - dataKeyEntry.second) > expireTime) {
} else {
std::string MessageCrypto::stringToHex(const char* inputStr, size_t len) {
static const char* hexVals = "0123456789ABCDEF";
std::string outHex;
outHex.reserve(2 * len + 2);
for (size_t i = 0; i < len; ++i) {
const unsigned char c = *(inputStr + i);
outHex.push_back(hexVals[c >> 4]);
outHex.push_back(hexVals[c & 15]);
return outHex;
std::string MessageCrypto::stringToHex(const std::string& inputStr, size_t len) {
return stringToHex(inputStr.c_str(), len);
Result MessageCrypto::addPublicKeyCipher(const std::set<std::string>& keyNames,
const CryptoKeyReaderPtr keyReader) {
Lock lock(mutex_);
// Generate data key
RAND_bytes(dataKey_.get(), dataKeyLen_);
if (PULSAR_UNLIKELY(logger()->isEnabled(Logger::LEVEL_DEBUG))) {
std::string dataKeyStr(reinterpret_cast<char*>(dataKey_.get()), dataKeyLen_);
std::string strHex = stringToHex(dataKeyStr, dataKeyStr.size());
LOG_DEBUG(logCtx_ << "Generated Data key " << strHex);
Result result = ResultOk;
for (auto it = keyNames.begin(); it != keyNames.end(); it++) {
result = addPublicKeyCipher(*it, keyReader);
if (result != ResultOk) {
return result;
return result;
Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, const CryptoKeyReaderPtr keyReader) {
if (keyName.empty()) {
LOG_ERROR(logCtx_ << "Keyname is empty ");
return ResultCryptoError;
// Read the public key and its info using callback
StringMap keyMeta;
EncryptionKeyInfo keyInfo;
Result result = keyReader->getPublicKey(keyName, keyMeta, keyInfo);
if (result != ResultOk) {
LOG_ERROR(logCtx_ << "Failed to get public key from KeyReader for key " << keyName);
return result;
RSA* pubKey = loadPublicKey(keyInfo.getKey());
if (pubKey == NULL) {
LOG_ERROR(logCtx_ << "Failed to load public key " << keyName);
return ResultCryptoError;
LOG_DEBUG(logCtx_ << " Public key " << keyName << " loaded successfully.");
int inSize = RSA_size(pubKey);
boost::scoped_array<unsigned char> encryptedKey(new unsigned char[inSize]);
int outSize =
RSA_public_encrypt(dataKeyLen_, dataKey_.get(), encryptedKey.get(), pubKey, RSA_PKCS1_OAEP_PADDING);
if (inSize != outSize) {
LOG_ERROR(logCtx_ << "Ciphertext is length not matching input key length for key " << keyName);
return ResultCryptoError;
std::string encryptedKeyStr(reinterpret_cast<char*>(encryptedKey.get()), inSize);
std::shared_ptr<EncryptionKeyInfo> eki(new EncryptionKeyInfo());
// Add a new entry or replace existing entry, if one is present.
encryptedDataKeyMap_[keyName] = eki;
if (PULSAR_UNLIKELY(logger()->isEnabled(Logger::LEVEL_DEBUG))) {
std::string strHex = stringToHex(encryptedKeyStr, encryptedKeyStr.size());
LOG_DEBUG(logCtx_ << " Data key encrypted for key " << keyName
<< ". Encrypted key size = " << encryptedKeyStr.size() << ", value = " << strHex);
return ResultOk;
bool MessageCrypto::removeKeyCipher(const std::string& keyName) {
if (!keyName.size()) {
return false;
return true;
bool MessageCrypto::encrypt(const std::set<std::string>& encKeys, const CryptoKeyReaderPtr keyReader,
proto::MessageMetadata& msgMetadata, SharedBuffer& payload,
SharedBuffer& encryptedPayload) {
if (!encKeys.size()) {
return false;
Lock lock(mutex_);
// Update message metadata with encrypted data key
for (auto it = encKeys.begin(); it != encKeys.end(); it++) {
const std::string& keyName = *it;
auto keyInfoIter = encryptedDataKeyMap_.find(keyName);
if (keyInfoIter == encryptedDataKeyMap_.end()) {
// Attempt to load the key. This will allow us to load keys as soon as
// a new key is added to producer config
Result result = addPublicKeyCipher(keyName, keyReader);
if (result != ResultOk) {
return false;
keyInfoIter = encryptedDataKeyMap_.find(keyName);
if (keyInfoIter == encryptedDataKeyMap_.end()) {
LOG_ERROR(logCtx_ << "Unable to find encrypted data key for " << keyName);
return false;
EncryptionKeyInfo* keyInfo = keyInfoIter->second.get();
proto::EncryptionKeys* encKeys = proto::EncryptionKeys().New();
if (PULSAR_UNLIKELY(logger()->isEnabled(Logger::LEVEL_DEBUG))) {
std::string strHex = stringToHex(keyInfo->getKey(), keyInfo->getKey().size());
LOG_DEBUG(logCtx_ << " Encrypted data key added for key " << keyName << ". Encrypted key size = "
<< keyInfo->getKey().size() << ", value = " << strHex);
if (keyInfo->getMetadata().size()) {
for (auto metaIter = keyInfo->getMetadata().begin(); metaIter != keyInfo->getMetadata().end();
metaIter++) {
proto::KeyValue* keyValue = proto::KeyValue().New();
LOG_DEBUG(logCtx_ << " Adding metadata for key " << keyName << ". Metadata key = "
<< metaIter->first << ", value =" << metaIter->second);
// TODO: Replace random with counter and periodic refreshing based on timer/counter value
RAND_bytes(iv_.get(), ivLen_);
msgMetadata.set_encryption_param(reinterpret_cast<char*>(iv_.get()), ivLen_);
encryptedPayload = SharedBuffer::allocate(payload.readableBytes() + EVP_MAX_BLOCK_LENGTH + tagLen_);
int encLen = 0;
if (!(cipherCtx = EVP_CIPHER_CTX_new())) {
LOG_ERROR(logCtx_ << " Failed to cipher ctx.");
return false;
if (EVP_EncryptInit_ex(cipherCtx, EVP_aes_256_gcm(), NULL, dataKey_.get(), iv_.get()) != 1) {
LOG_ERROR(logCtx_ << " Failed to init cipher ctx.");
return false;
if (EVP_CIPHER_CTX_set_padding(cipherCtx, EVP_CIPH_NO_PADDING) != 1) {
LOG_ERROR(logCtx_ << " Failed to set cipher padding.");
return false;
if (EVP_EncryptUpdate(cipherCtx, reinterpret_cast<unsigned char*>(encryptedPayload.mutableData()),
&encLen, reinterpret_cast<unsigned const char*>(,
payload.readableBytes()) != 1) {
LOG_ERROR(logCtx_ << " Failed to encrypt payload.");
return false;
encLen = 0;
if (EVP_EncryptFinal_ex(cipherCtx, reinterpret_cast<unsigned char*>(encryptedPayload.mutableData()),
&encLen) != 1) {
LOG_ERROR(logCtx_ << " Failed to finalize encryption.");
return false;
if (EVP_CIPHER_CTX_ctrl(cipherCtx, EVP_CTRL_GCM_GET_TAG, tagLen_, encryptedPayload.mutableData()) != 1) {
LOG_ERROR(logCtx_ << " Failed to get cipher tag info.");
return false;
if (PULSAR_UNLIKELY(logger()->isEnabled(Logger::LEVEL_DEBUG))) {
std::string strPayloadHex = stringToHex(, payload.readableBytes());
std::string strHex = stringToHex(, encryptedPayload.readableBytes());
LOG_DEBUG(logCtx_ << " Original size = " << payload.readableBytes() << ", value = " << strPayloadHex
<< ". Encrypted size " << encryptedPayload.readableBytes()
<< ", value =" << strHex);
return true;
bool MessageCrypto::decryptDataKey(const proto::EncryptionKeys& encKeys, const CryptoKeyReader& keyReader) {
const auto& keyName = encKeys.key();
const auto& encryptedDataKey = encKeys.value();
const auto& encKeyMeta = encKeys.metadata();
StringMap keyMeta;
for (auto iter = encKeyMeta.begin(); iter != encKeyMeta.end(); iter++) {
keyMeta[iter->key()] = iter->value();
// Read the private key info using callback
EncryptionKeyInfo keyInfo;
keyReader.getPrivateKey(keyName, keyMeta, keyInfo);
// Convert key from string to RSA key
RSA* privKey = loadPrivateKey(keyInfo.getKey());
if (privKey == NULL) {
LOG_ERROR(logCtx_ << " Failed to load private key " << keyName);
return false;
LOG_DEBUG(logCtx_ << " Private key " << keyName << " loaded successfully.");
// Decrypt data key
int outSize = RSA_private_decrypt(encryptedDataKey.size(),
reinterpret_cast<unsigned const char*>(encryptedDataKey.c_str()),
dataKey_.get(), privKey, RSA_PKCS1_OAEP_PADDING);
if (outSize == -1) {
LOG_ERROR(logCtx_ << "Failed to decrypt AES key for " << keyName);
return false;
unsigned char keyDigest[EVP_MAX_MD_SIZE];
unsigned int digestLen = 0;
if (!getDigest(keyName, encryptedDataKey.c_str(), encryptedDataKey.size(), keyDigest, digestLen)) {
LOG_ERROR(logCtx_ << "Failed to get digest for data key " << keyName);
return false;
std::string keyDigestStr(reinterpret_cast<char*>(keyDigest), digestLen);
std::string dataKeyStr(reinterpret_cast<char*>(dataKey_.get()), dataKeyLen_);
dataKeyCache_[keyDigestStr] = make_pair(dataKeyStr, boost::posix_time::second_clock::universal_time());
if (PULSAR_UNLIKELY(logger()->isEnabled(Logger::LEVEL_DEBUG))) {
std::string strHex = stringToHex(dataKeyStr, dataKeyStr.size());
LOG_DEBUG(logCtx_ << "Data key for key " << keyName << " decrypted. Decrypted data key is "
<< strHex);
// Remove expired entries from the cache
return true;
bool MessageCrypto::decryptData(const std::string& dataKeySecret, const proto::MessageMetadata& msgMetadata,
SharedBuffer& payload, SharedBuffer& decryptedPayload) {
// unpack iv and encrypted data
decryptedPayload = SharedBuffer::allocate(payload.readableBytes() + EVP_MAX_BLOCK_LENGTH + tagLen_);
if (PULSAR_UNLIKELY(logger()->isEnabled(Logger::LEVEL_DEBUG))) {
std::string strHex = stringToHex(, payload.readableBytes());
LOG_DEBUG(logCtx_ << "Attempting to decrypt data with encrypted size " << payload.readableBytes()
<< ", data = " << strHex);
if (!(cipherCtx = EVP_CIPHER_CTX_new())) {
LOG_ERROR(logCtx_ << " Failed to get cipher ctx");
return false;
if (!EVP_DecryptInit_ex(cipherCtx, EVP_aes_256_gcm(), NULL,
reinterpret_cast<unsigned const char*>(dataKeySecret.c_str()),
reinterpret_cast<unsigned const char*>(iv_.get()))) {
LOG_ERROR(logCtx_ << " Failed to init decrypt cipher ctx");
return false;
if (EVP_CIPHER_CTX_set_padding(cipherCtx, EVP_CIPH_NO_PADDING) != 1) {
LOG_ERROR(logCtx_ << " Failed to set cipher padding");
return false;
int cipherLen = payload.readableBytes() - tagLen_;
int decLen = 0;
if (!EVP_DecryptUpdate(cipherCtx, reinterpret_cast<unsigned char*>(decryptedPayload.mutableData()),
&decLen, reinterpret_cast<unsigned const char*>(, cipherLen)) {
LOG_ERROR(logCtx_ << " Failed to decrypt update");
return false;
if (!EVP_CIPHER_CTX_ctrl(cipherCtx, EVP_CTRL_GCM_SET_TAG, tagLen_, (void*)( + cipherLen))) {
LOG_ERROR(logCtx_ << " Failed to set gcm tag");
return false;
if (!EVP_DecryptFinal_ex(cipherCtx, reinterpret_cast<unsigned char*>(decryptedPayload.mutableData()),
&decLen)) {
LOG_ERROR(logCtx_ << " Failed to finalize encrypted message");
return false;
if (PULSAR_UNLIKELY(logger()->isEnabled(Logger::LEVEL_DEBUG))) {
std::string strHex = stringToHex(, decryptedPayload.readableBytes());
LOG_DEBUG(logCtx_ << "Data decrypted. Decrypted size = " << decryptedPayload.readableBytes()
<< ", data = " << strHex);
return true;
bool MessageCrypto::getKeyAndDecryptData(const proto::MessageMetadata& msgMetadata, SharedBuffer& payload,
SharedBuffer& decryptedPayload) {
SharedBuffer decryptedData;
bool dataDecrypted = false;
for (auto iter = msgMetadata.encryption_keys().begin(); iter != msgMetadata.encryption_keys().end();
iter++) {
const std::string& keyName = iter->key();
const std::string& encDataKey = iter->value();
unsigned char keyDigest[EVP_MAX_MD_SIZE];
unsigned int digestLen = 0;
getDigest(keyName, encDataKey.c_str(), encDataKey.size(), keyDigest, digestLen);
std::string keyDigestStr(reinterpret_cast<char*>(keyDigest), digestLen);
auto dataKeyCacheIter = dataKeyCache_.find(keyDigestStr);
if (dataKeyCacheIter != dataKeyCache_.end()) {
// Taking a small performance hit here if the hash collides. When it
// retruns a different key, decryption fails. At this point, we would
// call decryptDataKey to refresh the cache and come here again to decrypt.
auto dataKeyEntry = dataKeyCacheIter->second;
if (decryptData(dataKeyEntry.first, msgMetadata, payload, decryptedPayload)) {
dataDecrypted = true;
} else {
// First time, entry won't be present in cache
LOG_DEBUG(logCtx_ << " Failed to decrypt data or data key is not in cache for "
<< keyName + ". Will attempt to refresh.");
return dataDecrypted;
bool MessageCrypto::decrypt(const proto::MessageMetadata& msgMetadata, SharedBuffer& payload,
const CryptoKeyReaderPtr keyReader, SharedBuffer& decryptedPayload) {
// Attempt to decrypt using the existing key
if (getKeyAndDecryptData(msgMetadata, payload, decryptedPayload)) {
return true;
// Either first time, or decryption failed. Attempt to regenerate data key
bool isDataKeyDecrypted = false;
for (int index = 0; index < msgMetadata.encryption_keys_size(); index++) {
const proto::EncryptionKeys& encKeys = msgMetadata.encryption_keys(index);
if (decryptDataKey(encKeys, *keyReader)) {
isDataKeyDecrypted = true;
if (!isDataKeyDecrypted) {
// Unable to decrypt data key
return false;
return getKeyAndDecryptData(msgMetadata, payload, decryptedPayload);
} /* namespace pulsar */