Support lite order producer
diff --git a/core/api.go b/core/api.go
index 22eef1d..f606b0d 100644
--- a/core/api.go
+++ b/core/api.go
@@ -48,6 +48,24 @@
return str
}
+type ProducerModel int
+
+const (
+ CommonProducer = ProducerModel(1)
+ OrderlyProducer = ProducerModel(2)
+)
+
+func (mode ProducerModel) String() string {
+ switch mode {
+ case CommonProducer:
+ return "CommonProducer"
+ case OrderlyProducer:
+ return "OrderlyProducer"
+ default:
+ return "Unknown"
+ }
+}
+
// NewProducer create a new producer with config
func NewProducer(config *ProducerConfig) (Producer, error) {
return newDefaultProducer(config)
@@ -59,6 +77,7 @@
SendMsgTimeout int
CompressLevel int
MaxMessageSize int
+ ProducerModel ProducerModel
}
func (config *ProducerConfig) String() string {
@@ -75,7 +94,7 @@
if config.MaxMessageSize > 0 {
str = strJoin(str, "MaxMessageSize", config.MaxMessageSize)
}
-
+ str = strJoin(str, "ProducerModel", config.ProducerModel.String())
return str + "]"
}
@@ -93,6 +112,8 @@
// SendMessageOneway send a message with oneway
SendMessageOneway(msg *Message) error
+
+ SendMessageOrderlyByShardingKey(msg *Message, shardingkey string) (*SendResult, error)
}
// NewPushConsumer create a new consumer with config.
diff --git a/core/producer.go b/core/producer.go
index fe2d978..e54890f 100644
--- a/core/producer.go
+++ b/core/producer.go
@@ -35,6 +35,7 @@
"errors"
log "github.com/sirupsen/logrus"
"unsafe"
+ "github.com/apache/rocketmq-client-go/core"
)
type SendStatus int
@@ -76,7 +77,12 @@
producer := &defaultProducer{config: config}
cs := C.CString(config.GroupID)
- cproduer := C.CreateProducer(cs)
+ var cproduer *C.struct_CProducer
+ if config.ProducerModel == OrderlyProducer {
+ cproduer = C.CreateOrderlyProducer(cs)
+ } else {
+ cproduer = C.CreateProducer(cs)
+ }
C.free(unsafe.Pointer(cs))
if cproduer == nil {
@@ -223,6 +229,10 @@
}
func (p *defaultProducer) SendMessageOrderly(msg *Message, selector MessageQueueSelector, arg interface{}, autoRetryTimes int) (*SendResult, error) {
+ if p.config.ProducerModel == OrderlyProducer {
+ log.Warnf("Can not send message orderly by common select queue in lite order producer")
+ return nil, rocketmq.ErrSendOrderlyFailed
+ }
cmsg := goMsgToC(msg)
defer C.DestroyMessage(cmsg)
key := selectors.put(&messageQueueSelectorWrapper{selector: selector, m: msg, arg: arg})
@@ -235,12 +245,12 @@
unsafe.Pointer(&key),
C.int(autoRetryTimes),
&sr))
-
+
if err != NIL {
log.Warnf("send message orderly error, error is: %s", err.Error())
return nil, err
}
-
+
return &SendResult{
Status: SendStatus(sr.sendStatus),
MsgId: C.GoString(&sr.msgId[0]),
@@ -257,7 +267,32 @@
log.Warnf("send message with oneway error, error is: %s", err.Error())
return err
}
-
+
log.Debugf("Send Message: %s with oneway success.", msg.String())
return nil
}
+
+func (p *defaultProducer) SendMessageOrderlyByShardingKey(msg *Message, shardingkey string) (*SendResult, error) {
+ if p.config.ProducerModel != OrderlyProducer {
+ log.Warnf("Can not send message orderly, This method only support in lite order producer.")
+ return nil, rocketmq.ErrSendOrderlyFailed
+ }
+ cmsg := goMsgToC(msg)
+ defer C.DestroyMessage(cmsg)
+ var sr C.struct__SendResult_
+ err := rmqError(C.SendMessageOrderlyByShardingKey(
+ p.cproduer,
+ cmsg,
+ unsafe.Pointer(&shardingkey)))
+
+ if err != NIL {
+ log.Warnf("send message orderly error, error is: %s", err.Error())
+ return nil, err
+ }
+
+ return &SendResult{
+ Status: SendStatus(sr.sendStatus),
+ MsgId: C.GoString(&sr.msgId[0]),
+ Offset: int64(sr.offset),
+ }, nil
+}