blob: 847605f9fb3e96799e089de9d03f583b0b304856 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package golang
import (
"fmt"
"time"
)
type ConsumeService interface {
consume(ProcessQueue, []*MessageView)
consumeWithDuration(*MessageView, time.Duration, func(ConsumerResult, error))
Shutdown() error
}
type baseConsumeService struct {
clientId string
messageListener MessageListener
consumptionExecutor *simpleThreadPool
messageInterceptor MessageInterceptor
}
func NewBaseConsumeService(clientId string, messageListener MessageListener, consumptionExecutor *simpleThreadPool, messageInterceptor MessageInterceptor) *baseConsumeService {
return &baseConsumeService{
clientId: clientId,
messageListener: messageListener,
consumptionExecutor: consumptionExecutor,
messageInterceptor: messageInterceptor,
}
}
func (bcs *baseConsumeService) consumeImmediately(messageView *MessageView, callback func(ConsumerResult, error)) {
bcs.consumeWithDuration(messageView, 0, callback)
}
func (bcs *baseConsumeService) consumeWithDuration(messageView *MessageView, duration time.Duration, callback func(ConsumerResult, error)) {
task := bcs.newConsumeTask(bcs.clientId, bcs.messageListener, messageView, bcs.messageInterceptor, callback)
if duration <= 0 {
bcs.consumptionExecutor.Submit(task)
return
}
time.AfterFunc(duration, func() { bcs.consumptionExecutor.Submit(task) })
}
func (bcs *baseConsumeService) Shutdown() error {
bcs.consumptionExecutor.Shutdown()
return nil
}
func (bcs *baseConsumeService) newConsumeTask(clientId string, messageListener MessageListener, messageView *MessageView, messageInterceptor MessageInterceptor, callback func(ConsumerResult, error)) func() {
return func() {
consumeResult := FAILURE
defer func() {
if e := recover(); e != nil {
err, ok := e.(error)
if !ok {
err = fmt.Errorf("panic cause [%v]", e)
}
sugarBaseLogger.Errorf("Message Interceptor raised an exception while consuming messages, clientId=%s, mq=%s, messageId=%s, err=%w", clientId, messageView.messageQueue.String(), messageView.messageId, err)
callback(FAILURE, err)
} else {
callback(consumeResult, nil)
}
}()
messageInterceptor.doBefore(MessageHookPoints_CONSUME, []*MessageCommon{messageView.GetMessageCommon()})
startTime := time.Now()
func() {
defer func() {
if e := recover(); e != nil {
consumeResult = FAILURE
err, ok := e.(error)
if !ok {
err = fmt.Errorf("panic cause [%v]", e)
}
sugarBaseLogger.Errorf("Message listener raised an exception while consuming messages, clientId=%s, mq=%s, messageId=%s, err=%w", clientId, messageView.messageQueue.String(), messageView.messageId, err)
}
}()
consumeResult = messageListener.consume(messageView)
}()
duration := time.Since(startTime)
status := MessageHookPointsStatus_ERROR
if consumeResult == SUCCESS {
status = MessageHookPointsStatus_OK
}
messageInterceptor.doAfter(MessageHookPoints_CONSUME, []*MessageCommon{messageView.GetMessageCommon()}, duration, status)
}
}
var _ = ConsumeService(&standardConsumeService{})
type standardConsumeService struct {
baseConsumeService
}
type fifoConsumeService struct {
baseConsumeService
}
func (scs *standardConsumeService) consume(pq ProcessQueue, messageViews []*MessageView) {
for _, mv := range messageViews {
if mv.isCorrupted() {
sugarBaseLogger.Errorf("Message is corrupted for standard consumption, prepare to discard it, mq=%s, messageId=%s, clientId=%s", pq.getMessageQueue().String(), mv.GetMessageId(), scs.clientId)
pq.discardMessage(mv)
continue
}
scs.consumeImmediately(mv, func(result ConsumerResult, err error) {
if err != nil {
sugarBaseLogger.Errorf("[Bug] Exception raised in consumption callback, clientId=%s", scs.clientId)
return
}
pq.eraseMessage(mv, result)
})
}
}
func NewStandardConsumeService(clientId string, messageListener MessageListener, consumptionExecutor *simpleThreadPool, messageInterceptor MessageInterceptor) *standardConsumeService {
return &standardConsumeService{
*NewBaseConsumeService(clientId, messageListener, consumptionExecutor, messageInterceptor),
}
}
func (fcs *fifoConsumeService) consume(pq ProcessQueue, messageViews []*MessageView) {
fcs.consumeIteratively(pq, &messageViews, 0)
}
func (fcs *fifoConsumeService) consumeIteratively(pq ProcessQueue, messageViewsPtr *[]*MessageView, ptr int) {
if messageViewsPtr == nil {
sugarBaseLogger.Errorf("[Bug] messageViews is nil when consumeIteratively")
return
}
messageViews := *messageViewsPtr
if ptr >= len(messageViews) {
return
}
mv := messageViews[ptr]
if mv.isCorrupted() {
sugarBaseLogger.Errorf("Message is corrupted for FIFO consumption, prepare to discard it, mq=%s, messageId=%s, clientId=%s", pq.getMessageQueue().String(), mv.GetMessageId(), fcs.clientId)
pq.discardFifoMessage(mv)
fcs.consumeIteratively(pq, messageViewsPtr, ptr+1)
return
}
fcs.consumeImmediately(mv, func(result ConsumerResult, err error) {
if err != nil {
sugarBaseLogger.Errorf("[Bug] Exception raised in consumption callback, clientId=%s", fcs.clientId)
return
}
pq.eraseFifoMessage(mv, result)
fcs.consumeIteratively(pq, messageViewsPtr, ptr+1)
})
}
func NewFiFoConsumeService(clientId string, messageListener MessageListener, consumptionExecutor *simpleThreadPool, messageInterceptor MessageInterceptor) *fifoConsumeService {
return &fifoConsumeService{
*NewBaseConsumeService(clientId, messageListener, consumptionExecutor, messageInterceptor),
}
}