fixed
diff --git a/core/api.go b/core/api.go
index 92619f3..b64cc97 100644
--- a/core/api.go
+++ b/core/api.go
@@ -29,7 +29,7 @@
// ProducerConfig define a producer
type ProducerConfig struct {
- GroupID string
+ GroupID string
NameServer string
Credentials *SessionCredentials
}
@@ -74,6 +74,15 @@
Subscribe(topic, expression string, consumeFunc func(msg *MessageExt) ConsumeStatus) error
}
+// PullConsumer consumer pulling the message
+type PullConsumer interface {
+ baseAPI
+ // Pull returns the messages from the consume queue by specify the offset and the max number
+ Pull(mq MessageQueue, subExpression string, offset int64, maxNums int) PullResult
+ // FetchSubscriptionMessageQueues returns the consume queue of the topic
+ FetchSubscriptionMessageQueues(topic string) []MessageQueue
+}
+
type SessionCredentials struct {
AccessKey string
SecretKey string
diff --git a/core/log.go b/core/log.go
index d9b87e8..e081e4c 100644
--- a/core/log.go
+++ b/core/log.go
@@ -21,6 +21,7 @@
#include "rocketmq/CCommon.h"
*/
import "C"
+import "fmt"
// LogLevel the log level
type LogLevel int
@@ -43,3 +44,7 @@
FileSize int64
Level LogLevel
}
+
+func (lc *LogConfig) String() string {
+ return fmt.Sprintf("%+v", *lc)
+}
diff --git a/core/pull_consumer.go b/core/pull_consumer.go
index e236ee6..19a033c 100644
--- a/core/pull_consumer.go
+++ b/core/pull_consumer.go
@@ -46,6 +46,23 @@
PullBrokerTimeout = PullStatus(C.E_BROKER_TIMEOUT)
)
+func (ps PullStatus) String() string {
+ switch ps {
+ case PullFound:
+ return "Found"
+ case PullNoNewMsg:
+ return "NoNewMsg"
+ case PullNoMatchedMsg:
+ return "NoMatchedMsg"
+ case PullOffsetIllegal:
+ return "OffsetIllegal"
+ case PullBrokerTimeout:
+ return "BrokerTimeout"
+ default:
+ return "Unknown status"
+ }
+}
+
// PullConsumerConfig the configuration for the pull consumer
type PullConsumerConfig struct {
GroupID string
@@ -133,7 +150,7 @@
return nil
}
-// FetchSubscriptionMessageQueues fetchs the topic's subcripted message queues
+// FetchSubscriptionMessageQueues returns the topic's consume queue
func (c *DefaultPullConsumer) FetchSubscriptionMessageQueues(topic string) []MessageQueue {
var (
q *C.struct__CMessageQueue_
@@ -168,6 +185,10 @@
Messages []*MessageExt
}
+func (pr *PullResult) String() string {
+ return fmt.Sprintf("%+v", *pr)
+}
+
// Pull pulling the message from the specified message queue
func (c *DefaultPullConsumer) Pull(mq MessageQueue, subExpression string, offset int64, maxNums int) PullResult {
cmq := C.struct__CMessageQueue_{
@@ -176,15 +197,11 @@
copy(cmq.topic[:], *(*[]C.char)(unsafe.Pointer(&mq.Topic)))
copy(cmq.brokerName[:], *(*[]C.char)(unsafe.Pointer(&mq.Broker)))
- fmt.Printf("%+v\n", []byte(mq.Topic))
- fmt.Printf("%+v\n", cmq.topic)
- fmt.Printf("%+v\n", []byte(mq.Broker))
- fmt.Printf("%+v\n", cmq.brokerName)
csubExpr := C.CString(subExpression)
cpullResult := C.Pull(c.cconsumer, &cmq, csubExpr, C.longlong(offset), C.int(maxNums))
- pullResult := PullResult{
+ pr := PullResult{
NextBeginOffset: int64(cpullResult.nextBeginOffset),
MinOffset: int64(cpullResult.minOffset),
MaxOffset: int64(cpullResult.maxOffset),
@@ -193,15 +210,16 @@
if cpullResult.size > 0 {
msgs := make([]*MessageExt, cpullResult.size)
for i := range msgs {
- msgs[i] = cmsgExtToGo((*C.struct_CMessageExt)(
+ msgs[i] = cmsgExtToGo(*(**C.struct_CMessageExt)(
unsafe.Pointer(
uintptr(unsafe.Pointer(cpullResult.msgFoundList)) + uintptr(i)*unsafe.Sizeof(*cpullResult.msgFoundList),
),
))
}
+ pr.Messages = msgs
}
C.free(unsafe.Pointer(csubExpr))
C.ReleasePullResult(cpullResult)
- return pullResult
+ return pr
}