Encryption support producer (#560)
* add ability to encrypt messages
- use base crypto package for encryption
* fix typo
* lint fixes
* address review suggestions
* revert go mod
* remove encryption context
- move it to Consumer MR
* try to fix check issues
* remove unused code
* remove embedded crypto struct
* review suggestions
* remove duplicate log
* lint code style issue fix
* return error from flush methods on serialization error
* update test case and do lazy data key generation
* address review changes
* add comments on test case
Co-authored-by: PGarule <PGarule@fanatics.com>
diff --git a/go.mod b/go.mod
index 67a7bb7..354f5b4 100644
--- a/go.mod
+++ b/go.mod
@@ -14,7 +14,6 @@
github.com/google/uuid v1.1.2
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/klauspost/compress v1.10.8
- github.com/kr/pretty v0.2.0 // indirect
github.com/linkedin/goavro/v2 v2.9.8
github.com/opentracing/opentracing-go v1.2.0
github.com/pierrec/lz4 v2.0.5+incompatible
@@ -27,7 +26,6 @@
github.com/stretchr/testify v1.5.1
go.uber.org/atomic v1.7.0
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
- gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
replace github.com/apache/pulsar-client-go/oauth2 => ./oauth2
diff --git a/go.sum b/go.sum
index 2ccd39e..8b372c7 100644
--- a/go.sum
+++ b/go.sum
@@ -22,7 +22,6 @@
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE=
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
-github.com/boynton/repl v0.0.0-20170116235056-348863958e3e/go.mod h1:Crc/GCZ3NXDVCio7Yr0o+SSrytpcFhLmVCIzi0s49t4=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/danieljoos/wincred v1.0.2 h1:zf4bhty2iLuwgjgpraD2E9UbvO+fe54XXGJbOwe23fU=
@@ -32,8 +31,6 @@
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
-github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 h1:CaO/zOnF8VvUfEbhRatPcwKVWamvbYd8tQGRWacE9kU=
-github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1/go.mod h1:+hnT3ywWDTAFrW5aE+u2Sa/wT555ZqwoCS+pk3p6ry4=
github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b h1:HBah4D48ypg3J7Np4N+HY/ZR76fx3HEUGxDU6Uk39oQ=
@@ -73,8 +70,6 @@
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
-github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
-github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
@@ -82,7 +77,6 @@
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jawher/mow.cli v1.0.4/go.mod h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk=
-github.com/jawher/mow.cli v1.1.0/go.mod h1:aNaQlc7ozF3vw6IJ2dHjp2ZFiA4ozMIYY6PyuRJwlUg=
github.com/jawher/mow.cli v1.2.0/go.mod h1:y+pcA3jBAdo/GIZx/0rFjw/K2bVEODP9rfZOfaiq8Ko=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
@@ -169,8 +163,6 @@
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
-github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
-github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
@@ -190,7 +182,6 @@
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
-golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
@@ -212,7 +203,6 @@
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -227,7 +217,6 @@
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
-golang.org/x/tools v0.0.0-20190808195139-e713427fea3f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
@@ -259,9 +248,5 @@
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
-gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
-gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
-gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
-gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/pulsar/encryption.go b/pulsar/encryption.go
new file mode 100644
index 0000000..aade2ca
--- /dev/null
+++ b/pulsar/encryption.go
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "github.com/apache/pulsar-client-go/pulsar/crypto"
+
+// ProducerEncryptionInfo encryption related fields required by the producer
+type ProducerEncryptionInfo struct {
+ // KeyReader read RSA public/private key pairs
+ KeyReader crypto.KeyReader
+
+ // MessageCrypto used to encrypt and decrypt the data and session keys
+ MessageCrypto crypto.MessageCrypto
+
+ // Keys list of encryption key names to encrypt session key
+ Keys []string
+
+ // ProducerCryptoFailureAction action to be taken on failure of message encryption
+ // default is ProducerCryptoFailureActionFail
+ ProducerCryptoFailureAction int
+}
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 3e1601f..92d6249 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -23,6 +23,7 @@
"github.com/gogo/protobuf/proto"
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
+ "github.com/apache/pulsar-client-go/pulsar/internal/crypto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
)
@@ -35,7 +36,7 @@
type BatcherBuilderProvider func(
maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level,
- bufferPool BuffersPool, logger log.Logger,
+ bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
) (BatchBuilder, error)
// BatchBuilder is a interface of batch builders
@@ -51,12 +52,12 @@
) bool
// Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
- Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{})
+ Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{}, err error)
// Flush all the messages buffered in multiple batches and wait until all
// messages have been successfully persisted.
FlushBatches() (
- batchData []Buffer, sequenceID []uint64, callbacks [][]interface{},
+ batchData []Buffer, sequenceID []uint64, callbacks [][]interface{}, errors []error,
)
// Return the batch container batch message in multiple batches.
@@ -93,13 +94,15 @@
buffersPool BuffersPool
log log.Logger
+
+ encryptor crypto.Encryptor
}
// newBatchContainer init a batchContainer
func newBatchContainer(
maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level,
- bufferPool BuffersPool, logger log.Logger,
+ bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
) batchContainer {
bc := batchContainer{
@@ -122,6 +125,7 @@
compressionProvider: getCompressionProvider(compressionType, level),
buffersPool: bufferPool,
log: logger,
+ encryptor: encryptor,
}
if compressionType != pb.CompressionType_NONE {
@@ -135,12 +139,12 @@
func NewBatchBuilder(
maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level,
- bufferPool BuffersPool, logger log.Logger,
+ bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
) (BatchBuilder, error) {
bc := newBatchContainer(
maxMessages, maxBatchSize, producerName, producerID, compressionType,
- level, bufferPool, logger,
+ level, bufferPool, logger, encryptor,
)
return &bc, nil
@@ -211,11 +215,11 @@
// Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
func (bc *batchContainer) Flush() (
- batchData Buffer, sequenceID uint64, callbacks []interface{},
+ batchData Buffer, sequenceID uint64, callbacks []interface{}, err error,
) {
if bc.numMessages == 0 {
// No-Op for empty batch
- return nil, 0, nil
+ return nil, 0, nil, nil
}
bc.log.Debug("BatchBuilder flush: messages: ", bc.numMessages)
@@ -229,19 +233,21 @@
if buffer == nil {
buffer = NewBuffer(int(uncompressedSize * 3 / 2))
}
- serializeBatch(
- buffer, bc.cmdSend, bc.msgMetadata, bc.buffer, bc.compressionProvider,
- )
+
+ if err = serializeBatch(
+ buffer, bc.cmdSend, bc.msgMetadata, bc.buffer, bc.compressionProvider, bc.encryptor,
+ ); err == nil { // no error in serializing Batch
+ sequenceID = bc.cmdSend.Send.GetSequenceId()
+ }
callbacks = bc.callbacks
- sequenceID = bc.cmdSend.Send.GetSequenceId()
bc.reset()
- return buffer, sequenceID, callbacks
+ return buffer, sequenceID, callbacks, err
}
// FlushBatches only for multiple batches container
func (bc *batchContainer) FlushBatches() (
- batchData []Buffer, sequenceID []uint64, callbacks [][]interface{},
+ batchData []Buffer, sequenceID []uint64, callbacks [][]interface{}, errors []error,
) {
panic("single batch container not support FlushBatches(), please use Flush() instead")
}
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index af6bac5..b91c0b6 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -24,6 +24,7 @@
"github.com/gogo/protobuf/proto"
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
+ "github.com/apache/pulsar-client-go/pulsar/internal/crypto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
)
@@ -221,9 +222,21 @@
cmdSend *pb.BaseCommand,
msgMetadata *pb.MessageMetadata,
uncompressedPayload Buffer,
- compressionProvider compression.Provider) {
+ compressionProvider compression.Provider,
+ encryptor crypto.Encryptor) error {
// Wire format
// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
+
+ // compress the payload
+ compressedPayload := compressionProvider.Compress(nil, uncompressedPayload.ReadableSlice())
+
+ // encrypt the compressed payload
+ encryptedPayload, err := encryptor.Encrypt(compressedPayload, msgMetadata)
+ if err != nil {
+ // error occurred while encrypting the payload, ProducerCryptoFailureAction is set to Fail
+ return fmt.Errorf("encryption of message failed, ProducerCryptoFailureAction is set to Fail. Error :%v", err)
+ }
+
cmdSize := uint32(proto.Size(cmdSend))
msgMetadataSize := uint32(proto.Size(msgMetadata))
@@ -234,7 +247,7 @@
// Write cmd
wb.WriteUint32(cmdSize)
wb.ResizeIfNeeded(cmdSize)
- _, err := cmdSend.MarshalToSizedBuffer(wb.WritableSlice()[:cmdSize])
+ _, err = cmdSend.MarshalToSizedBuffer(wb.WritableSlice()[:cmdSize])
if err != nil {
panic(fmt.Sprintf("Protobuf error when serializing cmdSend: %v", err))
}
@@ -255,12 +268,8 @@
}
wb.WrittenBytes(msgMetadataSize)
- // Make sure the buffer has enough space to hold the compressed data
- // and perform the compression in-place
- maxSize := uint32(compressionProvider.CompressMaxSize(int(uncompressedPayload.ReadableBytes())))
- wb.ResizeIfNeeded(maxSize)
- b := compressionProvider.Compress(wb.WritableSlice()[:0], uncompressedPayload.ReadableSlice())
- wb.WrittenBytes(uint32(len(b)))
+ // add payload to the buffer
+ wb.Write(encryptedPayload)
// Write checksum at created checksum-placeholder
frameEndIdx := wb.WriterIndex()
@@ -269,6 +278,7 @@
// Set Sizes and checksum in the fixed-size header
wb.PutUint32(frameEndIdx-frameStartIdx, frameSizeIdx) // External frame
wb.PutUint32(checksum, checksumIdx)
+ return nil
}
// ConvertFromStringMap convert a string map to a KeyValue []byte
diff --git a/pulsar/internal/crypto/encryptor.go b/pulsar/internal/crypto/encryptor.go
new file mode 100644
index 0000000..7fdbf06
--- /dev/null
+++ b/pulsar/internal/crypto/encryptor.go
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+import (
+ pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+)
+
+// Encryptor support encryption
+type Encryptor interface {
+ Encrypt([]byte, *pb.MessageMetadata) ([]byte, error)
+}
diff --git a/pulsar/internal/crypto/noop_encryptor.go b/pulsar/internal/crypto/noop_encryptor.go
new file mode 100644
index 0000000..4512e7b
--- /dev/null
+++ b/pulsar/internal/crypto/noop_encryptor.go
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+import (
+ pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+)
+
+type noopEncryptor struct{}
+
+func NewNoopEncryptor() Encryptor {
+ return &noopEncryptor{}
+}
+
+// Encrypt Noop ecryptor
+func (e *noopEncryptor) Encrypt(data []byte, msgMetadata *pb.MessageMetadata) ([]byte, error) {
+ return data, nil
+}
diff --git a/pulsar/internal/crypto/producer_encryptor.go b/pulsar/internal/crypto/producer_encryptor.go
new file mode 100644
index 0000000..a5b972d
--- /dev/null
+++ b/pulsar/internal/crypto/producer_encryptor.go
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+import (
+ "fmt"
+
+ "github.com/apache/pulsar-client-go/pulsar/crypto"
+ pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+ "github.com/apache/pulsar-client-go/pulsar/log"
+)
+
+type producerEncryptor struct {
+ keys []string
+ keyReader crypto.KeyReader
+ messageCrypto crypto.MessageCrypto
+ logger log.Logger
+ producerCryptoFailureAction int
+}
+
+func NewProducerEncryptor(keys []string,
+ keyReader crypto.KeyReader,
+ messageCrypto crypto.MessageCrypto,
+ producerCryptoFailureAction int,
+ logger log.Logger) Encryptor {
+ return &producerEncryptor{
+ keys: keys,
+ keyReader: keyReader,
+ messageCrypto: messageCrypto,
+ logger: logger,
+ producerCryptoFailureAction: producerCryptoFailureAction,
+ }
+}
+
+// Encrypt producer encryptor
+func (e *producerEncryptor) Encrypt(payload []byte, msgMetadata *pb.MessageMetadata) ([]byte, error) {
+ // encrypt payload
+ encryptedPayload, err := e.messageCrypto.Encrypt(e.keys,
+ e.keyReader,
+ crypto.NewMessageMetadataSupplier(msgMetadata),
+ payload)
+
+ // error encryping the payload
+ if err != nil {
+ // error occurred in encrypting the payload
+ // crypto ProducerCryptoFailureAction is set to send
+ // send unencrypted message
+ if e.producerCryptoFailureAction == crypto.ProducerCryptoFailureActionSend {
+ e.logger.
+ WithError(err).
+ Warnf("Encryption failed for payload sending unencrypted message ProducerCryptoFailureAction is set to send")
+ return payload, nil
+ }
+
+ return nil, fmt.Errorf("ProducerCryptoFailureAction is set to Fail and error occurred in encrypting payload :%v", err)
+ }
+ return encryptedPayload, nil
+}
diff --git a/pulsar/internal/key_based_batch_builder.go b/pulsar/internal/key_based_batch_builder.go
index 545c2c8..940aa9f 100644
--- a/pulsar/internal/key_based_batch_builder.go
+++ b/pulsar/internal/key_based_batch_builder.go
@@ -24,6 +24,7 @@
"time"
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
+ "github.com/apache/pulsar-client-go/pulsar/internal/crypto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
)
@@ -85,14 +86,14 @@
func NewKeyBasedBatchBuilder(
maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level,
- bufferPool BuffersPool, logger log.Logger,
+ bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
) (BatchBuilder, error) {
bb := &keyBasedBatchContainer{
batches: newKeyBasedBatches(),
batchContainer: newBatchContainer(
maxMessages, maxBatchSize, producerName, producerID,
- compressionType, level, bufferPool, logger,
+ compressionType, level, bufferPool, logger, encryptor,
),
compressionType: compressionType,
level: level,
@@ -144,7 +145,7 @@
// create batchContainer for new key
t := newBatchContainer(
bc.maxMessages, bc.maxBatchSize, bc.producerName, bc.producerID,
- bc.compressionType, bc.level, bc.buffersPool, bc.log,
+ bc.compressionType, bc.level, bc.buffersPool, bc.log, bc.encryptor,
)
batchPart = &t
bc.batches.Add(msgKey, &t)
@@ -179,11 +180,11 @@
// Flush all the messages buffered in multiple batches and wait until all
// messages have been successfully persisted.
func (bc *keyBasedBatchContainer) FlushBatches() (
- batchesData []Buffer, sequenceIDs []uint64, callbacks [][]interface{},
+ batchesData []Buffer, sequenceIDs []uint64, callbacks [][]interface{}, errors []error,
) {
if bc.numMessages == 0 {
// No-Op for empty batch
- return nil, nil, nil
+ return nil, nil, nil, nil
}
bc.log.Debug("keyBasedBatchContainer flush: messages: ", bc.numMessages)
@@ -194,6 +195,7 @@
batchesData = make([]Buffer, batchesLen)
sequenceIDs = make([]uint64, batchesLen)
callbacks = make([][]interface{}, batchesLen)
+ errors = make([]error, batchesLen)
bc.batches.l.RLock()
defer bc.batches.l.RUnlock()
@@ -203,21 +205,22 @@
sort.Strings(sortedKeys)
for _, k := range sortedKeys {
container := bc.batches.containers[k]
- b, s, c := container.Flush()
+ b, s, c, err := container.Flush()
if b != nil {
batchesData[idx] = b
sequenceIDs[idx] = s
callbacks[idx] = c
+ errors[idx] = err
}
idx++
}
bc.reset()
- return batchesData, sequenceIDs, callbacks
+ return batchesData, sequenceIDs, callbacks, errors
}
func (bc *keyBasedBatchContainer) Flush() (
- batchData Buffer, sequenceID uint64, callbacks []interface{},
+ batchData Buffer, sequenceID uint64, callbacks []interface{}, err error,
) {
panic("multi batches container not support Flush(), please use FlushBatches() instead")
}
diff --git a/pulsar/producer.go b/pulsar/producer.go
index ffbdebb..07a8f75 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -163,6 +163,9 @@
// PartitionsAutoDiscoveryInterval is the time interval for the background process to discover new partitions
// Default is 1 minute
PartitionsAutoDiscoveryInterval time.Duration
+
+ // Encryption necessary fields to perform encryption of message
+ Encryption *ProducerEncryptionInfo
}
// Producer is used to publish messages on a topic
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index ca6850d..4ae4e00 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -19,11 +19,14 @@
import (
"context"
+ "fmt"
"sync"
"sync/atomic"
"time"
+ "github.com/apache/pulsar-client-go/pulsar/crypto"
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
+ internalcrypto "github.com/apache/pulsar-client-go/pulsar/internal/crypto"
"github.com/gogo/protobuf/proto"
@@ -130,6 +133,24 @@
p.producerName = options.Name
}
+ encryption := options.Encryption
+ // add default message crypto if not provided
+ if encryption != nil && len(encryption.Keys) > 0 {
+ if encryption.KeyReader == nil {
+ return nil, fmt.Errorf("encryption is enabled, KeyReader can not be nil")
+ }
+
+ if encryption.MessageCrypto == nil {
+ logCtx := fmt.Sprintf("[%v] [%v] [%v]", p.topic, p.producerName, p.producerID)
+ messageCrypto, err := crypto.NewDefaultMessageCrypto(logCtx, true, logger)
+ if err != nil {
+ logger.WithError(err).Error("Unable to get MessageCrypto instance. Producer creation is abandoned")
+ return nil, err
+ }
+ p.options.Encryption.MessageCrypto = messageCrypto
+ }
+ }
+
err := p.grabCnx()
if err != nil {
logger.WithError(err).Error("Failed to create producer")
@@ -147,6 +168,7 @@
if p.options.SendTimeout > 0 {
go p.failTimeoutMessages()
}
+
go p.runEventsLoop()
return p, nil
@@ -205,13 +227,25 @@
}
p.producerName = res.Response.ProducerSuccess.GetProducerName()
+
+ var encryptor internalcrypto.Encryptor
+ if p.options.Encryption != nil {
+ encryptor = internalcrypto.NewProducerEncryptor(p.options.Encryption.Keys,
+ p.options.Encryption.KeyReader,
+ p.options.Encryption.MessageCrypto,
+ p.options.Encryption.ProducerCryptoFailureAction, p.log)
+ } else {
+ encryptor = internalcrypto.NewNoopEncryptor()
+ }
+
if p.options.DisableBatching {
provider, _ := GetBatcherBuilderProvider(DefaultBatchBuilder)
p.batchBuilder, err = provider(p.options.BatchingMaxMessages, p.options.BatchingMaxSize,
p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType),
compression.Level(p.options.CompressionLevel),
p,
- p.log)
+ p.log,
+ encryptor)
if err != nil {
return err
}
@@ -225,7 +259,8 @@
p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType),
compression.Level(p.options.CompressionLevel),
p,
- p.log)
+ p.log,
+ encryptor)
if err != nil {
return err
}
@@ -470,11 +505,22 @@
}
func (p *partitionProducer) internalFlushCurrentBatch() {
- batchData, sequenceID, callbacks := p.batchBuilder.Flush()
+ batchData, sequenceID, callbacks, err := p.batchBuilder.Flush()
if batchData == nil {
return
}
+ // error occurred in batch flush
+ // report it using callback
+ if err != nil {
+ for _, cb := range callbacks {
+ if sr, ok := cb.(*sendRequest); ok {
+ sr.callback(nil, sr.msg, err)
+ }
+ }
+ return
+ }
+
p.pendingQueue.Put(&pendingItem{
sentAt: time.Now(),
batchData: batchData,
@@ -589,12 +635,22 @@
}
func (p *partitionProducer) internalFlushCurrentBatches() {
- batchesData, sequenceIDs, callbacks := p.batchBuilder.FlushBatches()
+ batchesData, sequenceIDs, callbacks, errors := p.batchBuilder.FlushBatches()
if batchesData == nil {
return
}
for i := range batchesData {
+ // error occurred in processing batch
+ // report it using callback
+ if errors[i] != nil {
+ for _, cb := range callbacks[i] {
+ if sr, ok := cb.(*sendRequest); ok {
+ sr.callback(nil, sr.msg, errors[i])
+ }
+ }
+ continue
+ }
if batchesData[i] == nil {
continue
}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index dc7a5ef..f914017 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -30,6 +30,8 @@
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/stretchr/testify/assert"
+ "github.com/apache/pulsar-client-go/pulsar/crypto"
+ plog "github.com/apache/pulsar-client-go/pulsar/log"
log "github.com/sirupsen/logrus"
)
@@ -996,6 +998,113 @@
makeHTTPCall(t, http.MethodDelete, quotaURL, "")
}
+func TestProducerWithRSAEncryption(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ ctx := context.Background()
+
+ msgCrypto, err := crypto.NewDefaultMessageCrypto("testing", true, plog.DefaultNopLogger())
+ assert.Nil(t, err)
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: false,
+ Encryption: &ProducerEncryptionInfo{
+ KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+ "crypto/testdata/pri_key_rsa.pem"),
+ MessageCrypto: msgCrypto,
+ Keys: []string{"my-app.key"},
+ },
+ Schema: NewStringSchema(nil),
+ })
+
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ // send 10 messages
+ for i := 0; i < 10; i++ {
+ if _, err := producer.Send(ctx, &ProducerMessage{
+ Value: fmt.Sprintf("hello-%d", i),
+ }); err != nil {
+ log.Fatal(err)
+ }
+ }
+}
+
+func TestProducuerCreationFailOnNilKeyReader(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+
+ msgCrypto, err := crypto.NewDefaultMessageCrypto("testing", true, plog.DefaultNopLogger())
+ assert.Nil(t, err)
+
+ // create producer
+ // Producer creation should fail as keyreader is nil
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: false,
+ Encryption: &ProducerEncryptionInfo{
+ MessageCrypto: msgCrypto,
+ Keys: []string{"my-app.key"},
+ },
+ Schema: NewStringSchema(nil),
+ })
+
+ assert.NotNil(t, err)
+ assert.Nil(t, producer)
+}
+
+func TestProducuerSendFailOnInvalidKey(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+
+ msgCrypto, err := crypto.NewDefaultMessageCrypto("testing", true, plog.DefaultNopLogger())
+ assert.Nil(t, err)
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ DisableBatching: false,
+ Encryption: &ProducerEncryptionInfo{
+ KeyReader: crypto.NewFileKeyReader("crypto/testdata/invalid_pub_key_rsa.pem",
+ "crypto/testdata/pri_key_rsa.pem"),
+ MessageCrypto: msgCrypto,
+ Keys: []string{"my-app.key"},
+ },
+ Schema: NewStringSchema(nil),
+ })
+
+ assert.Nil(t, err)
+ assert.NotNil(t, producer)
+
+ // producer should send return an error as keyreader is configured with wrong pub.key and fail while encrypting message
+ mid, err := producer.Send(context.Background(), &ProducerMessage{
+ Value: "test",
+ })
+
+ assert.NotNil(t, err)
+ assert.Nil(t, mid)
+}
+
type noopProduceInterceptor struct{}
func (noopProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {}