Support log level set and add more log in offset. (#489)
* Add log print for offset manager by broker
* modify log level for offset
* Support log level set
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index 44a0597..17f5d76 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -255,6 +255,12 @@
rlog.LogKeyUnderlayError: err.Error(),
"offset": off,
})
+ } else {
+ rlog.Info("update offset to broker success", map[string]interface{}{
+ rlog.LogKeyConsumerGroup: r.group,
+ rlog.LogKeyMessageQueue: mq.String(),
+ "offset": off,
+ })
}
}
}
@@ -264,8 +270,9 @@
defer r.mutex.Unlock()
delete(r.OffsetTable, *mq)
- rlog.Info("delete mq from offset table", map[string]interface{}{
- rlog.LogKeyMessageQueue: mq,
+ rlog.Warning("delete mq from offset table", map[string]interface{}{
+ rlog.LogKeyConsumerGroup: r.group,
+ rlog.LogKeyMessageQueue: mq,
})
}
@@ -286,13 +293,19 @@
case _ReadFromStore:
off, err := r.fetchConsumeOffsetFromBroker(r.group, mq)
if err != nil {
- rlog.Error("fecth offset of mq error", map[string]interface{}{
+ rlog.Error("fecth offset of mq from broker error", map[string]interface{}{
+ rlog.LogKeyConsumerGroup: r.group,
rlog.LogKeyMessageQueue: mq.String(),
rlog.LogKeyUnderlayError: err,
})
r.mutex.RUnlock()
return -1
}
+ rlog.Warning("fecth offset of mq from broker success", map[string]interface{}{
+ rlog.LogKeyConsumerGroup: r.group,
+ rlog.LogKeyMessageQueue: mq.String(),
+ "offset": off,
+ })
r.mutex.RUnlock()
r.update(mq, off, true)
return off
diff --git a/rlog/log.go b/rlog/log.go
index 426f698..444ec3f 100644
--- a/rlog/log.go
+++ b/rlog/log.go
@@ -41,6 +41,7 @@
Warning(msg string, fields map[string]interface{})
Error(msg string, fields map[string]interface{})
Fatal(msg string, fields map[string]interface{})
+ Level(level string)
}
func init() {
@@ -101,11 +102,29 @@
}
l.logger.WithFields(fields).Fatal(msg)
}
+func (l *defaultLogger) Level(level string) {
+ switch strings.ToLower(level) {
+ case "debug":
+ l.logger.SetLevel(logrus.DebugLevel)
+ case "warn":
+ l.logger.SetLevel(logrus.WarnLevel)
+ case "error":
+ l.logger.SetLevel(logrus.ErrorLevel)
+ default:
+ l.logger.SetLevel(logrus.InfoLevel)
+ }
+}
// SetLogger use specified logger user customized, in general, we suggest user to replace the default logger with specified
func SetLogger(logger Logger) {
rLog = logger
}
+func SetLogLevel(level string) {
+ if level == "" {
+ return
+ }
+ rLog.Level(level)
+}
func Debug(msg string, fields map[string]interface{}) {
rLog.Debug(msg, fields)