blob: 74e4a0b6b7e710bbcff1af91c89a56b1bb2bdecf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rocketmq
/*
#cgo LDFLAGS: -L/usr/local/lib -lrocketmq
#include <stdio.h>
#include <stdlib.h>
#include "rocketmq/CMessageExt.h"
#include "rocketmq/CPullConsumer.h"
*/
import "C"
import (
"errors"
"fmt"
"sync"
"unsafe"
)
// PullStatus pull status
type PullStatus int
// predefined pull status
const (
PullFound = PullStatus(C.E_FOUND)
PullNoNewMsg = PullStatus(C.E_NO_NEW_MSG)
PullNoMatchedMsg = PullStatus(C.E_NO_MATCHED_MSG)
PullOffsetIllegal = PullStatus(C.E_OFFSET_ILLEGAL)
PullBrokerTimeout = PullStatus(C.E_BROKER_TIMEOUT)
)
func (ps PullStatus) String() string {
switch ps {
case PullFound:
return "Found"
case PullNoNewMsg:
return "NoNewMsg"
case PullNoMatchedMsg:
return "NoMatchedMsg"
case PullOffsetIllegal:
return "OffsetIllegal"
case PullBrokerTimeout:
return "BrokerTimeout"
default:
return "Unknown status"
}
}
// defaultPullConsumer default consumer pulling the message
type defaultPullConsumer struct {
PullConsumerConfig
cconsumer *C.struct_CPullConsumer
funcsMap sync.Map
}
func (c *defaultPullConsumer) String() string {
topics := ""
c.funcsMap.Range(func(key, value interface{}) bool {
topics += key.(string) + ", "
return true
})
return fmt.Sprintf("[%+v, subcribed topics: [%s]]", c.PullConsumerConfig, topics)
}
// NewPullConsumer creates a pull consumer
func NewPullConsumer(config *PullConsumerConfig) (PullConsumer, error) {
if config == nil {
return nil, errors.New("config is nil")
}
if config.GroupID == "" {
return nil, errors.New("GroupId is empty")
}
if config.NameServer == "" && config.NameServerDomain == "" {
return nil, errors.New("NameServer and NameServerDomain is empty")
}
cs := C.CString(config.GroupID)
cconsumer := C.CreatePullConsumer(cs)
C.free(unsafe.Pointer(cs))
if cconsumer == nil {
return nil, errors.New("create PullConsumer failed")
}
var err rmqError
if config.NameServer != "" {
cs = C.CString(config.NameServer)
err = rmqError(C.SetPullConsumerNameServerAddress(cconsumer, cs))
C.free(unsafe.Pointer(cs))
if err != NIL {
return nil, err
}
}
if config.NameServerDomain != "" {
cs = C.CString(config.NameServerDomain)
err = rmqError(C.SetPullConsumerNameServerDomain(cconsumer, cs))
C.free(unsafe.Pointer(cs))
if err != NIL {
return nil, err
}
}
if config.Credentials != nil {
ak := C.CString(config.Credentials.AccessKey)
sk := C.CString(config.Credentials.SecretKey)
ch := C.CString(config.Credentials.Channel)
err = rmqError(C.SetPullConsumerSessionCredentials(cconsumer, ak, sk, ch))
C.free(unsafe.Pointer(ak))
C.free(unsafe.Pointer(sk))
C.free(unsafe.Pointer(ch))
if err != NIL {
return nil, err
}
}
if config.LogC != nil {
cs = C.CString(config.LogC.Path)
err = rmqError(C.SetPullConsumerLogPath(cconsumer, cs))
C.free(unsafe.Pointer(cs))
if err != NIL {
return nil, err
}
err = rmqError(C.SetPullConsumerLogFileNumAndSize(cconsumer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize)))
if err != NIL {
return nil, err
}
err = rmqError(C.SetPullConsumerLogLevel(cconsumer, C.CLogLevel(config.LogC.Level)))
if err != NIL {
return nil, err
}
}
return &defaultPullConsumer{PullConsumerConfig: *config, cconsumer: cconsumer}, nil
}
// Start starts the pulling consumer
func (c *defaultPullConsumer) Start() error {
err := rmqError(C.StartPullConsumer(c.cconsumer))
if err != NIL {
return err
}
return nil
}
// Shutdown shutdown the pulling consumer
func (c *defaultPullConsumer) Shutdown() error {
err := rmqError(C.ShutdownPullConsumer(c.cconsumer))
if err != NIL {
return err
}
err = rmqError(C.DestroyPullConsumer(c.cconsumer))
if err != NIL {
return err
}
return nil
}
// FetchSubscriptionMessageQueues returns the topic's consume queue
func (c *defaultPullConsumer) FetchSubscriptionMessageQueues(topic string) []MessageQueue {
var (
q *C.struct__CMessageQueue_
size C.int
)
ctopic := C.CString(topic)
C.FetchSubscriptionMessageQueues(c.cconsumer, ctopic, &q, &size)
C.free(unsafe.Pointer(ctopic))
if size == 0 {
return nil
}
qs := make([]MessageQueue, size)
for i := range qs {
cq := (*C.struct__CMessageQueue_)(
unsafe.Pointer(uintptr(unsafe.Pointer(q)) + uintptr(i)*unsafe.Sizeof(*q)),
)
qs[i].ID, qs[i].Broker, qs[i].Topic = int(cq.queueId), C.GoString(&cq.brokerName[0]), topic
}
C.ReleaseSubscriptionMessageQueue(q)
return qs
}
// PullResult the pull result
type PullResult struct {
NextBeginOffset int64
MinOffset int64
MaxOffset int64
Status PullStatus
Messages []*MessageExt
}
func (pr *PullResult) String() string {
return fmt.Sprintf("%+v", *pr)
}
// Pull pulling the message from the specified message queue
func (c *defaultPullConsumer) Pull(mq MessageQueue, subExpression string, offset int64, maxNums int) PullResult {
cmq := C.struct__CMessageQueue_{
queueId: C.int(mq.ID),
}
copy(cmq.topic[:], *(*[]C.char)(unsafe.Pointer(&mq.Topic)))
copy(cmq.brokerName[:], *(*[]C.char)(unsafe.Pointer(&mq.Broker)))
csubExpr := C.CString(subExpression)
cpullResult := C.Pull(c.cconsumer, &cmq, csubExpr, C.longlong(offset), C.int(maxNums))
pr := PullResult{
NextBeginOffset: int64(cpullResult.nextBeginOffset),
MinOffset: int64(cpullResult.minOffset),
MaxOffset: int64(cpullResult.maxOffset),
Status: PullStatus(cpullResult.pullStatus),
}
if cpullResult.size > 0 {
msgs := make([]*MessageExt, cpullResult.size)
for i := range msgs {
msgs[i] = cmsgExtToGo(*(**C.struct_CMessageExt)(
unsafe.Pointer(
uintptr(unsafe.Pointer(cpullResult.msgFoundList)) + uintptr(i)*unsafe.Sizeof(*cpullResult.msgFoundList),
),
))
}
pr.Messages = msgs
}
C.free(unsafe.Pointer(csubExpr))
C.ReleasePullResult(cpullResult)
return pr
}