Add set maxcachesize api to support simple flow-control
diff --git a/core/api.go b/core/api.go
index f606b0d..152deae 100644
--- a/core/api.go
+++ b/core/api.go
@@ -160,10 +160,12 @@
// PushConsumerConfig define a new consumer.
type PushConsumerConfig struct {
ClientConfig
- ThreadCount int
- MessageBatchMaxSize int
- Model MessageModel
- ConsumerModel ConsumerModel
+ ThreadCount int
+ MessageBatchMaxSize int
+ Model MessageModel
+ ConsumerModel ConsumerModel
+ MaxCacheMessageSize int
+ MaxCacheMessageSizeInMB int
}
func (config *PushConsumerConfig) String() string {
@@ -185,6 +187,14 @@
if config.ConsumerModel != 0 {
str = strJoin(str, "ConsumerModel", config.ConsumerModel.String())
}
+
+ if config.MaxCacheMessageSize != 0 {
+ str = strJoin(str, "MaxCacheMessageSize", config.MaxCacheMessageSize)
+ }
+
+ if config.MaxCacheMessageSizeInMB != 0 {
+ str = strJoin(str, "MaxCacheMessageSizeInMB", config.MaxCacheMessageSizeInMB)
+ }
return str + "]"
}
diff --git a/core/push_consumer.go b/core/push_consumer.go
index 29da7fb..017a733 100644
--- a/core/push_consumer.go
+++ b/core/push_consumer.go
@@ -167,6 +167,20 @@
}
}
+ if config.MaxCacheMessageSize > 0 {
+ err = rmqError(C.SetPushConsumerMaxCacheMessageSize(cconsumer, C.int(config.MaxCacheMessageSize)))
+ if err != NIL {
+ return nil, err
+ }
+ }
+
+ if config.MaxCacheMessageSizeInMB > 0 {
+ err = rmqError(C.SetPushConsumerMaxCacheMessageSizeInMb(cconsumer, C.int(config.MaxCacheMessageSizeInMB)))
+ if err != NIL {
+ return nil, err
+ }
+ }
+
if config.Model != 0 {
var mode C.CMessageModel
switch config.Model {