[ISSUE #354] feat: Support PanicHandler (#355)
* feat: Support PanicHandler
Closes #354
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 0c7f224..731eb9d 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -66,7 +66,6 @@
subscribedTopic map[string]string
interceptor primitive.Interceptor
queueLock *QueueLock
- lockTicker *time.Ticker
done chan struct{}
closeOnce sync.Once
}
@@ -107,7 +106,6 @@
defaultConsumer: dc,
subscribedTopic: make(map[string]string, 0),
queueLock: newQueueLock(),
- lockTicker: time.NewTicker(dc.option.RebalanceLockInterval),
done: make(chan struct{}, 1),
consumeFunc: utils.NewSet(),
}
@@ -168,14 +166,16 @@
pc.Rebalance()
time.Sleep(1 * time.Second)
- go func() {
+ go primitive.WithRecover(func() {
// initial lock.
time.Sleep(1000 * time.Millisecond)
pc.lockAll()
+ lockTicker := time.NewTicker(pc.option.RebalanceLockInterval)
+ defer lockTicker.Stop()
for {
select {
- case <-pc.lockTicker.C:
+ case <-lockTicker.C:
pc.lockAll()
case <-pc.done:
rlog.Info("push consumer close tick.", map[string]interface{}{
@@ -184,7 +184,7 @@
return
}
}
- }()
+ })
})
if err != nil {
@@ -209,7 +209,6 @@
func (pc *pushConsumer) Shutdown() error {
var err error
pc.closeOnce.Do(func() {
- pc.lockTicker.Stop()
close(pc.done)
pc.client.UnregisterConsumer(pc.consumerGroup)
@@ -438,7 +437,7 @@
})
var sleepTime time.Duration
pq := request.pq
- go func() {
+ go primitive.WithRecover(func() {
for {
select {
case <-pc.done:
@@ -450,7 +449,7 @@
pc.submitToConsume(request.pq, request.mq)
}
}
- }()
+ })
for {
NEXT:
@@ -683,13 +682,13 @@
})
request.nextOffset = result.NextBeginOffset
pq.WithDropped(true)
- go func() {
+ go primitive.WithRecover(func() {
time.Sleep(10 * time.Second)
pc.storage.update(request.mq, request.nextOffset, false)
pc.storage.persist([]*primitive.MessageQueue{request.mq})
pc.storage.remove(request.mq)
rlog.Warning(fmt.Sprintf("fix the pull request offset: %s", request.String()), nil)
- }()
+ })
default:
rlog.Warning(fmt.Sprintf("unknown pull status: %v", result.Status), nil)
sleepTime = _PullDelayTimeWhenError
@@ -866,7 +865,7 @@
subMsgs = msgs[count:next]
count = next - 1
}
- go func() {
+ go primitive.WithRecover(func() {
RETRY:
if pq.IsDroppd() {
rlog.Info("the message queue not be able to consume, because it was dropped", map[string]interface{}{
@@ -948,7 +947,7 @@
"message": msgs,
})
}
- }()
+ })
}
}
diff --git a/consumer/statistics.go b/consumer/statistics.go
index a58ff9e..fdc6379 100644
--- a/consumer/statistics.go
+++ b/consumer/statistics.go
@@ -24,6 +24,7 @@
"sync/atomic"
"time"
+ "github.com/apache/rocketmq-client-go/primitive"
"github.com/apache/rocketmq-client-go/rlog"
)
@@ -148,7 +149,7 @@
}
func (sis *statsItemSet) init() {
- go func() {
+ go primitive.WithRecover(func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
@@ -160,9 +161,9 @@
}
}
- }()
+ })
- go func() {
+ go primitive.WithRecover(func() {
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()
for {
@@ -173,9 +174,9 @@
sis.samplingInMinutes()
}
}
- }()
+ })
- go func() {
+ go primitive.WithRecover(func() {
ticker := time.NewTicker(time.Hour)
defer ticker.Stop()
for {
@@ -186,9 +187,9 @@
sis.samplingInHour()
}
}
- }()
+ })
- go func() {
+ go primitive.WithRecover(func() {
time.Sleep(nextMinutesTime().Sub(time.Now()))
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
@@ -200,9 +201,9 @@
sis.printAtMinutes()
}
}
- }()
+ })
- go func() {
+ go primitive.WithRecover(func() {
time.Sleep(nextHourTime().Sub(time.Now()))
ticker := time.NewTicker(time.Hour)
defer ticker.Stop()
@@ -214,9 +215,9 @@
sis.printAtHour()
}
}
- }()
+ })
- go func() {
+ go primitive.WithRecover(func() {
time.Sleep(nextMonthTime().Sub(time.Now()))
ticker := time.NewTicker(24 * time.Hour)
defer ticker.Stop()
@@ -228,7 +229,7 @@
sis.printAtDay()
}
}
- }()
+ })
}
func (sis *statsItemSet) samplingInSeconds() {
diff --git a/errors.go b/errors.go
index 6a774aa..fe9ba33 100644
--- a/errors.go
+++ b/errors.go
@@ -22,7 +22,6 @@
)
var (
- // ErrRequestTimeout for request timeout error
ErrRequestTimeout = errors.New("request timeout")
ErrMQEmpty = errors.New("MessageQueue is nil")
ErrOffset = errors.New("offset < 0")
diff --git a/internal/client.go b/internal/client.go
index ca8cc88..86a02cb 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -282,7 +282,7 @@
}
// schedule update route info
- go func() {
+ go primitive.WithRecover(func() {
// delay
ticker := time.NewTicker(_PullNameServerInterval)
defer ticker.Stop()
@@ -298,10 +298,9 @@
return
}
}
- }()
+ })
- // TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock
- go func() {
+ go primitive.WithRecover(func() {
ticker := time.NewTicker(_HeartbeatBrokerInterval)
defer ticker.Stop()
for {
@@ -316,10 +315,10 @@
return
}
}
- }()
+ })
// schedule persist offset
- go func() {
+ go primitive.WithRecover(func() {
ticker := time.NewTicker(_PersistOffsetInterval)
defer ticker.Stop()
for {
@@ -342,9 +341,9 @@
return
}
}
- }()
+ })
- go func() {
+ go primitive.WithRecover(func() {
ticker := time.NewTicker(_RebalanceInterval)
defer ticker.Stop()
for {
@@ -358,7 +357,7 @@
return
}
}
- }()
+ })
})
}
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index 3d2bf7f..58abd3a 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -95,7 +95,9 @@
if err != nil {
return err
}
- go c.receiveAsync(resp)
+ go primitive.WithRecover(func() {
+ c.receiveAsync(resp)
+ })
return nil
}
@@ -127,7 +129,9 @@
return nil, err
}
c.connectionTable.Store(addr, tcpConn)
- go c.receiveResponse(tcpConn)
+ go primitive.WithRecover(func() {
+ c.receiveResponse(tcpConn)
+ })
return tcpConn, nil
}
@@ -196,20 +200,20 @@
if exist {
c.responseTable.Delete(cmd.Opaque)
responseFuture := resp.(*ResponseFuture)
- go func() {
+ go primitive.WithRecover(func() {
responseFuture.ResponseCommand = cmd
responseFuture.executeInvokeCallback()
if responseFuture.Done != nil {
responseFuture.Done <- true
}
- }()
+ })
}
} else {
f := c.processors[cmd.Code]
if f != nil {
// single goroutine will be deadlock
// TODO: optimize with goroutine pool, https://github.com/apache/rocketmq-client-go/issues/307
- go func() {
+ go primitive.WithRecover(func() {
res := f(cmd, r.RemoteAddr())
if res != nil {
res.Opaque = cmd.Opaque
@@ -222,7 +226,7 @@
})
}
}
- }()
+ })
} else {
rlog.Warning("receive broker's requests, but no func to handle", map[string]interface{}{
"responseCode": cmd.Code,
diff --git a/internal/trace.go b/internal/trace.go
index 212ee60..9549bba 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -257,7 +257,9 @@
func (td *traceDispatcher) Start() {
td.running = true
td.cli.Start()
- go td.process()
+ go primitive.WithRecover(func() {
+ td.process()
+ })
}
func (td *traceDispatcher) Close() {
@@ -299,7 +301,9 @@
batch = append(batch, ctx)
if count == batchSize {
count = 0
- go td.batchCommit(batch)
+ go primitive.WithRecover(func() {
+ td.batchCommit(batch)
+ })
batch = make([]TraceContext, 0)
}
case <-td.ticker.C:
@@ -308,12 +312,16 @@
count++
lastput = time.Now()
if len(batch) > 0 {
- go td.batchCommit(batch)
+ go primitive.WithRecover(func() {
+ td.batchCommit(batch)
+ })
batch = make([]TraceContext, 0)
}
}
case <-td.ctx.Done():
- go td.batchCommit(batch)
+ go primitive.WithRecover(func() {
+ td.batchCommit(batch)
+ })
batch = make([]TraceContext, 0)
now := time.Now().UnixNano() / int64(time.Millisecond)
diff --git a/internal/utils/fun.go b/internal/utils/fun.go
deleted file mode 100644
index 4e9e4ea..0000000
--- a/internal/utils/fun.go
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package utils
diff --git a/primitive/base.go b/primitive/base.go
index ae0f06f..efc48ef 100644
--- a/primitive/base.go
+++ b/primitive/base.go
@@ -80,3 +80,18 @@
}
return nil
}
+
+var PanicHandler func(interface{})
+
+func WithRecover(fn func()) {
+ defer func() {
+ handler := PanicHandler
+ if handler != nil {
+ if err := recover(); err != nil {
+ handler(err)
+ }
+ }
+ }()
+
+ fn()
+}
diff --git a/producer/producer.go b/producer/producer.go
index a47d76f..0762762 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -411,7 +411,9 @@
}
func (tp *transactionProducer) Start() error {
- go tp.checkTransactionState()
+ go primitive.WithRecover(func() {
+ tp.checkTransactionState()
+ })
return tp.producer.Start()
}
func (tp *transactionProducer) Shutdown() error {