| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package rocketmq |
| |
| /* |
| #cgo LDFLAGS: -L/usr/local/lib -lrocketmq |
| |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include "rocketmq/CMessageExt.h" |
| #include "rocketmq/CPullConsumer.h" |
| */ |
| import "C" |
| |
| import ( |
| "errors" |
| "fmt" |
| "sync" |
| "unsafe" |
| ) |
| |
| // PullStatus pull status |
| type PullStatus int |
| |
| // predefined pull status |
| const ( |
| PullFound = PullStatus(C.E_FOUND) |
| PullNoNewMsg = PullStatus(C.E_NO_NEW_MSG) |
| PullNoMatchedMsg = PullStatus(C.E_NO_MATCHED_MSG) |
| PullOffsetIllegal = PullStatus(C.E_OFFSET_ILLEGAL) |
| PullBrokerTimeout = PullStatus(C.E_BROKER_TIMEOUT) |
| ) |
| |
| func (ps PullStatus) String() string { |
| switch ps { |
| case PullFound: |
| return "Found" |
| case PullNoNewMsg: |
| return "NoNewMsg" |
| case PullNoMatchedMsg: |
| return "NoMatchedMsg" |
| case PullOffsetIllegal: |
| return "OffsetIllegal" |
| case PullBrokerTimeout: |
| return "BrokerTimeout" |
| default: |
| return "Unknown status" |
| } |
| } |
| |
| // defaultPullConsumer default consumer pulling the message |
| type defaultPullConsumer struct { |
| PullConsumerConfig |
| cconsumer *C.struct_CPullConsumer |
| funcsMap sync.Map |
| } |
| |
| func (c *defaultPullConsumer) String() string { |
| topics := "" |
| c.funcsMap.Range(func(key, value interface{}) bool { |
| topics += key.(string) + ", " |
| return true |
| }) |
| return fmt.Sprintf("[%+v, subcribed topics: [%s]]", c.PullConsumerConfig, topics) |
| } |
| |
| // NewPullConsumer creates a pull consumer |
| func NewPullConsumer(config *PullConsumerConfig) (PullConsumer, error) { |
| if config == nil { |
| return nil, errors.New("config is nil") |
| } |
| if config.GroupID == "" { |
| return nil, errors.New("GroupId is empty") |
| } |
| |
| if config.NameServer == "" && config.NameServerDomain == "" { |
| return nil, errors.New("NameServer and NameServerDomain is empty") |
| } |
| |
| cs := C.CString(config.GroupID) |
| cconsumer := C.CreatePullConsumer(cs) |
| C.free(unsafe.Pointer(cs)) |
| if cconsumer == nil { |
| return nil, errors.New("create PullConsumer failed") |
| } |
| |
| var err rmqError |
| if config.NameServer != "" { |
| cs = C.CString(config.NameServer) |
| err = rmqError(C.SetPullConsumerNameServerAddress(cconsumer, cs)) |
| C.free(unsafe.Pointer(cs)) |
| if err != NIL { |
| return nil, err |
| } |
| } |
| |
| if config.NameServerDomain != "" { |
| cs = C.CString(config.NameServerDomain) |
| err = rmqError(C.SetPullConsumerNameServerDomain(cconsumer, cs)) |
| C.free(unsafe.Pointer(cs)) |
| if err != NIL { |
| return nil, err |
| } |
| } |
| |
| if config.Credentials != nil { |
| ak := C.CString(config.Credentials.AccessKey) |
| sk := C.CString(config.Credentials.SecretKey) |
| ch := C.CString(config.Credentials.Channel) |
| err = rmqError(C.SetPullConsumerSessionCredentials(cconsumer, ak, sk, ch)) |
| C.free(unsafe.Pointer(ak)) |
| C.free(unsafe.Pointer(sk)) |
| C.free(unsafe.Pointer(ch)) |
| if err != NIL { |
| return nil, err |
| } |
| } |
| |
| if config.LogC != nil { |
| cs = C.CString(config.LogC.Path) |
| err = rmqError(C.SetPullConsumerLogPath(cconsumer, cs)) |
| C.free(unsafe.Pointer(cs)) |
| if err != NIL { |
| return nil, err |
| } |
| |
| err = rmqError(C.SetPullConsumerLogFileNumAndSize(cconsumer, C.int(config.LogC.FileNum), C.long(config.LogC.FileSize))) |
| if err != NIL { |
| return nil, err |
| } |
| |
| err = rmqError(C.SetPullConsumerLogLevel(cconsumer, C.CLogLevel(config.LogC.Level))) |
| if err != NIL { |
| return nil, err |
| } |
| } |
| |
| return &defaultPullConsumer{PullConsumerConfig: *config, cconsumer: cconsumer}, nil |
| } |
| |
| // Start starts the pulling consumer |
| func (c *defaultPullConsumer) Start() error { |
| err := rmqError(C.StartPullConsumer(c.cconsumer)) |
| if err != NIL { |
| return err |
| } |
| return nil |
| } |
| |
| // Shutdown shutdown the pulling consumer |
| func (c *defaultPullConsumer) Shutdown() error { |
| err := rmqError(C.ShutdownPullConsumer(c.cconsumer)) |
| if err != NIL { |
| return err |
| } |
| |
| err = rmqError(C.DestroyPullConsumer(c.cconsumer)) |
| if err != NIL { |
| return err |
| } |
| return nil |
| } |
| |
| // FetchSubscriptionMessageQueues returns the topic's consume queue |
| func (c *defaultPullConsumer) FetchSubscriptionMessageQueues(topic string) []MessageQueue { |
| var ( |
| q *C.struct__CMessageQueue_ |
| size C.int |
| ) |
| |
| ctopic := C.CString(topic) |
| C.FetchSubscriptionMessageQueues(c.cconsumer, ctopic, &q, &size) |
| C.free(unsafe.Pointer(ctopic)) |
| if size == 0 { |
| return nil |
| } |
| |
| qs := make([]MessageQueue, size) |
| for i := range qs { |
| cq := (*C.struct__CMessageQueue_)( |
| unsafe.Pointer(uintptr(unsafe.Pointer(q)) + uintptr(i)*unsafe.Sizeof(*q)), |
| ) |
| qs[i].ID, qs[i].Broker, qs[i].Topic = int(cq.queueId), C.GoString(&cq.brokerName[0]), topic |
| } |
| C.ReleaseSubscriptionMessageQueue(q) |
| |
| return qs |
| } |
| |
| // PullResult the pull result |
| type PullResult struct { |
| NextBeginOffset int64 |
| MinOffset int64 |
| MaxOffset int64 |
| Status PullStatus |
| Messages []*MessageExt |
| } |
| |
| func (pr *PullResult) String() string { |
| return fmt.Sprintf("%+v", *pr) |
| } |
| |
| // Pull pulling the message from the specified message queue |
| func (c *defaultPullConsumer) Pull(mq MessageQueue, subExpression string, offset int64, maxNums int) PullResult { |
| cmq := C.struct__CMessageQueue_{ |
| queueId: C.int(mq.ID), |
| } |
| |
| copy(cmq.topic[:], *(*[]C.char)(unsafe.Pointer(&mq.Topic))) |
| copy(cmq.brokerName[:], *(*[]C.char)(unsafe.Pointer(&mq.Broker))) |
| |
| csubExpr := C.CString(subExpression) |
| cpullResult := C.Pull(c.cconsumer, &cmq, csubExpr, C.longlong(offset), C.int(maxNums)) |
| |
| pr := PullResult{ |
| NextBeginOffset: int64(cpullResult.nextBeginOffset), |
| MinOffset: int64(cpullResult.minOffset), |
| MaxOffset: int64(cpullResult.maxOffset), |
| Status: PullStatus(cpullResult.pullStatus), |
| } |
| if cpullResult.size > 0 { |
| msgs := make([]*MessageExt, cpullResult.size) |
| for i := range msgs { |
| msgs[i] = cmsgExtToGo(*(**C.struct_CMessageExt)( |
| unsafe.Pointer( |
| uintptr(unsafe.Pointer(cpullResult.msgFoundList)) + uintptr(i)*unsafe.Sizeof(*cpullResult.msgFoundList), |
| ), |
| )) |
| } |
| pr.Messages = msgs |
| } |
| |
| C.free(unsafe.Pointer(csubExpr)) |
| C.ReleasePullResult(cpullResult) |
| return pr |
| } |