Merge pull request #554 from lampnick/master
modified typo word Resovler to Resolver
diff --git a/admin/admin.go b/admin/admin.go
new file mode 100644
index 0000000..f45f39a
--- /dev/null
+++ b/admin/admin.go
@@ -0,0 +1,202 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package admin
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/apache/rocketmq-client-go/v2/internal"
+ "github.com/apache/rocketmq-client-go/v2/internal/remote"
+ "github.com/apache/rocketmq-client-go/v2/primitive"
+ "github.com/apache/rocketmq-client-go/v2/rlog"
+)
+
+type Admin interface {
+ CreateTopic(ctx context.Context, opts ...OptionCreate) error
+ DeleteTopic(ctx context.Context, opts ...OptionDelete) error
+ //TODO
+ //TopicList(ctx context.Context, mq *primitive.MessageQueue) (*remote.RemotingCommand, error)
+ //GetBrokerClusterInfo(ctx context.Context) (*remote.RemotingCommand, error)
+ Close() error
+}
+
+// TODO: move outdated context to ctx
+type adminOptions struct {
+ internal.ClientOptions
+}
+
+type AdminOption func(options *adminOptions)
+
+func defaultAdminOptions() *adminOptions {
+ opts := &adminOptions{
+ ClientOptions: internal.DefaultClientOptions(),
+ }
+ opts.GroupName = "TOOLS_ADMIN"
+ opts.InstanceName = time.Now().String()
+ return opts
+}
+
+// WithResolver nameserver resolver to fetch nameserver addr
+func WithResolver(resolver primitive.NsResolver) AdminOption {
+ return func(options *adminOptions) {
+ options.Resolver = resolver
+ }
+}
+
+type admin struct {
+ cli internal.RMQClient
+ namesrv internal.Namesrvs
+
+ opts *adminOptions
+
+ closeOnce sync.Once
+}
+
+// NewAdmin initialize admin
+func NewAdmin(opts ...AdminOption) (Admin, error) {
+ defaultOpts := defaultAdminOptions()
+ for _, opt := range opts {
+ opt(defaultOpts)
+ }
+
+ cli := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
+ namesrv, err := internal.NewNamesrv(defaultOpts.Resolver)
+ if err != nil {
+ return nil, err
+ }
+ //log.Printf("Client: %#v", namesrv.srvs)
+ return &admin{
+ cli: cli,
+ namesrv: namesrv,
+ opts: defaultOpts,
+ }, nil
+}
+
+// CreateTopic create topic.
+// TODO: another implementation like sarama, without brokerAddr as input
+func (a *admin) CreateTopic(ctx context.Context, opts ...OptionCreate) error {
+ cfg := defaultTopicConfigCreate()
+ for _, apply := range opts {
+ apply(&cfg)
+ }
+
+ request := &internal.CreateTopicRequestHeader{
+ Topic: cfg.Topic,
+ DefaultTopic: cfg.DefaultTopic,
+ ReadQueueNums: cfg.ReadQueueNums,
+ WriteQueueNums: cfg.WriteQueueNums,
+ Perm: cfg.Perm,
+ TopicFilterType: cfg.TopicFilterType,
+ TopicSysFlag: cfg.TopicSysFlag,
+ Order: cfg.Order,
+ }
+
+ cmd := remote.NewRemotingCommand(internal.ReqCreateTopic, request, nil)
+ _, err := a.cli.InvokeSync(ctx, cfg.BrokerAddr, cmd, 5*time.Second)
+ if err != nil {
+ rlog.Error("create topic error", map[string]interface{}{
+ rlog.LogKeyTopic: cfg.Topic,
+ rlog.LogKeyBroker: cfg.BrokerAddr,
+ rlog.LogKeyUnderlayError: err,
+ })
+ } else {
+ rlog.Info("create topic success", map[string]interface{}{
+ rlog.LogKeyTopic: cfg.Topic,
+ rlog.LogKeyBroker: cfg.BrokerAddr,
+ })
+ }
+ return err
+}
+
+// DeleteTopicInBroker delete topic in broker.
+func (a *admin) deleteTopicInBroker(ctx context.Context, topic string, brokerAddr string) (*remote.RemotingCommand, error) {
+ request := &internal.DeleteTopicRequestHeader{
+ Topic: topic,
+ }
+
+ cmd := remote.NewRemotingCommand(internal.ReqDeleteTopicInBroker, request, nil)
+ return a.cli.InvokeSync(ctx, brokerAddr, cmd, 5*time.Second)
+}
+
+// DeleteTopicInNameServer delete topic in nameserver.
+func (a *admin) deleteTopicInNameServer(ctx context.Context, topic string, nameSrvAddr string) (*remote.RemotingCommand, error) {
+ request := &internal.DeleteTopicRequestHeader{
+ Topic: topic,
+ }
+
+ cmd := remote.NewRemotingCommand(internal.ReqDeleteTopicInNameSrv, request, nil)
+ return a.cli.InvokeSync(ctx, nameSrvAddr, cmd, 5*time.Second)
+}
+
+// DeleteTopic delete topic in both broker and nameserver.
+func (a *admin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error {
+ cfg := defaultTopicConfigDelete()
+ for _, apply := range opts {
+ apply(&cfg)
+ }
+ //delete topic in broker
+ if cfg.BrokerAddr == "" {
+ a.namesrv.UpdateTopicRouteInfo(cfg.Topic)
+ cfg.BrokerAddr = a.namesrv.FindBrokerAddrByTopic(cfg.Topic)
+ }
+
+ if _, err := a.deleteTopicInBroker(ctx, cfg.Topic, cfg.BrokerAddr); err != nil {
+ if err != nil {
+ rlog.Error("delete topic in broker error", map[string]interface{}{
+ rlog.LogKeyTopic: cfg.Topic,
+ rlog.LogKeyBroker: cfg.BrokerAddr,
+ rlog.LogKeyUnderlayError: err,
+ })
+ }
+ return err
+ }
+
+ //delete topic in nameserver
+ if len(cfg.NameSrvAddr) == 0 {
+ a.namesrv.UpdateTopicRouteInfo(cfg.Topic)
+ cfg.NameSrvAddr = a.namesrv.AddrList()
+ }
+
+ for _, nameSrvAddr := range cfg.NameSrvAddr {
+ if _, err := a.deleteTopicInNameServer(ctx, cfg.Topic, nameSrvAddr); err != nil {
+ if err != nil {
+ rlog.Error("delete topic in name server error", map[string]interface{}{
+ rlog.LogKeyTopic: cfg.Topic,
+ "nameServer": nameSrvAddr,
+ rlog.LogKeyUnderlayError: err,
+ })
+ }
+ return err
+ }
+ }
+ rlog.Info("delete topic success", map[string]interface{}{
+ rlog.LogKeyTopic: cfg.Topic,
+ rlog.LogKeyBroker: cfg.BrokerAddr,
+ "nameServer": cfg.NameSrvAddr,
+ })
+ return nil
+}
+
+func (a *admin) Close() error {
+ a.closeOnce.Do(func() {
+ a.cli.Shutdown()
+ })
+ return nil
+}
diff --git a/admin/option.go b/admin/option.go
new file mode 100644
index 0000000..d5a648e
--- /dev/null
+++ b/admin/option.go
@@ -0,0 +1,131 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package admin
+
+func defaultTopicConfigCreate() TopicConfigCreate {
+ opts := TopicConfigCreate{
+ DefaultTopic: "defaultTopic",
+ ReadQueueNums: 8,
+ WriteQueueNums: 8,
+ Perm: 6,
+ TopicFilterType: "SINGLE_TAG",
+ TopicSysFlag: 0,
+ Order: false,
+ }
+ return opts
+}
+
+type TopicConfigCreate struct {
+ Topic string
+ BrokerAddr string
+ DefaultTopic string
+ ReadQueueNums int
+ WriteQueueNums int
+ Perm int
+ TopicFilterType string
+ TopicSysFlag int
+ Order bool
+}
+
+type OptionCreate func(*TopicConfigCreate)
+
+func WithTopicCreate(Topic string) OptionCreate {
+ return func(opts *TopicConfigCreate) {
+ opts.Topic = Topic
+ }
+}
+
+func WithBrokerAddrCreate(BrokerAddr string) OptionCreate {
+ return func(opts *TopicConfigCreate) {
+ opts.BrokerAddr = BrokerAddr
+ }
+}
+
+func WithReadQueueNums(ReadQueueNums int) OptionCreate {
+ return func(opts *TopicConfigCreate) {
+ opts.ReadQueueNums = ReadQueueNums
+ }
+}
+
+func WithWriteQueueNums(WriteQueueNums int) OptionCreate {
+ return func(opts *TopicConfigCreate) {
+ opts.WriteQueueNums = WriteQueueNums
+ }
+}
+
+func WithPerm(Perm int) OptionCreate {
+ return func(opts *TopicConfigCreate) {
+ opts.Perm = Perm
+ }
+}
+
+func WithTopicFilterType(TopicFilterType string) OptionCreate {
+ return func(opts *TopicConfigCreate) {
+ opts.TopicFilterType = TopicFilterType
+ }
+}
+
+func WithTopicSysFlag(TopicSysFlag int) OptionCreate {
+ return func(opts *TopicConfigCreate) {
+ opts.TopicSysFlag = TopicSysFlag
+ }
+}
+
+func WithOrder(Order bool) OptionCreate {
+ return func(opts *TopicConfigCreate) {
+ opts.Order = Order
+ }
+}
+
+func defaultTopicConfigDelete() TopicConfigDelete {
+ opts := TopicConfigDelete{}
+ return opts
+}
+
+type TopicConfigDelete struct {
+ Topic string
+ ClusterName string
+ NameSrvAddr []string
+ BrokerAddr string
+}
+
+type OptionDelete func(*TopicConfigDelete)
+
+func WithTopicDelete(Topic string) OptionDelete {
+ return func(opts *TopicConfigDelete) {
+ opts.Topic = Topic
+ }
+}
+
+func WithBrokerAddrDelete(BrokerAddr string) OptionDelete {
+ return func(opts *TopicConfigDelete) {
+ opts.BrokerAddr = BrokerAddr
+ }
+}
+
+func WithClusterName(ClusterName string) OptionDelete {
+ return func(opts *TopicConfigDelete) {
+ opts.ClusterName = ClusterName
+ }
+}
+
+func WithNameSrvAddr(NameSrvAddr []string) OptionDelete {
+ return func(opts *TopicConfigDelete) {
+ opts.NameSrvAddr = NameSrvAddr
+ }
+}
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 3129b8d..3504786 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -754,7 +754,7 @@
case ConsumeFromLastOffset:
if lastOffset == -1 {
if strings.HasPrefix(mq.Topic, internal.RetryGroupTopicPrefix) {
- lastOffset = 0
+ result = 0
} else {
lastOffset, err := dc.queryMaxOffset(mq)
if err == nil {
@@ -929,7 +929,7 @@
brokerAddr := dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
if brokerAddr == "" {
dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
- brokerAddr = dc.namesrv.FindBrokerAddrByName(mq.Topic)
+ brokerAddr = dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
}
if brokerAddr == "" {
return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
@@ -958,7 +958,7 @@
brokerAddr := dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
if brokerAddr == "" {
dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
- brokerAddr = dc.namesrv.FindBrokerAddrByName(mq.Topic)
+ brokerAddr = dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
}
if brokerAddr == "" {
return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 0523f08..3a54194 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -81,6 +81,7 @@
return nil, errors.Wrap(err, "new Namesrv failed.")
}
+ defaultOpts.Namesrv = srvs
dc := &defaultConsumer{
client: internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
consumerGroup: defaultOpts.GroupName,
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index d2aee5b..58945c2 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -705,7 +705,7 @@
time.Sleep(10 * time.Second)
pc.storage.update(request.mq, request.nextOffset, false)
pc.storage.persist([]*primitive.MessageQueue{request.mq})
- pc.processQueueTable.Delete(request.mq)
+ pc.processQueueTable.Delete(*request.mq)
rlog.Warning(fmt.Sprintf("fix the pull request offset: %s", request.String()), nil)
default:
rlog.Warning(fmt.Sprintf("unknown pull status: %v", result.Status), nil)
@@ -913,7 +913,7 @@
Properties: make(map[string]string),
ConsumerGroup: pc.consumerGroup,
MQ: mq,
- Msgs: msgs,
+ Msgs: subMsgs,
}
ctx := context.Background()
ctx = primitive.WithConsumerCtx(ctx, msgCtx)
@@ -944,14 +944,14 @@
} else {
increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
if pc.model == BroadCasting {
- for i := 0; i < len(msgs); i++ {
+ for i := 0; i < len(subMsgs); i++ {
rlog.Warning("BROADCASTING, the message consume failed, drop it", map[string]interface{}{
"message": subMsgs[i],
})
}
} else {
- for i := 0; i < len(msgs); i++ {
- msg := msgs[i]
+ for i := 0; i < len(subMsgs); i++ {
+ msg := subMsgs[i]
if !pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) {
msg.ReconsumeTimes += 1
msgBackFailed = append(msgBackFailed, msg)
@@ -973,7 +973,7 @@
} else {
rlog.Warning("processQueue is dropped without process consume result.", map[string]interface{}{
rlog.LogKeyMessageQueue: mq,
- "message": msgs,
+ "message": subMsgs,
})
}
})
diff --git a/docs/Introduction.md b/docs/Introduction.md
index dd86c90..a4e954e 100644
--- a/docs/Introduction.md
+++ b/docs/Introduction.md
@@ -101,3 +101,28 @@
```
Full examples: [consumer](../examples/consumer)
+
+
+### Admin: Topic Operation
+
+#### Examples
+- create topic
+```
+testAdmin, err := admin.NewAdmin(admin.WithResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})))
+err = testAdmin.CreateTopic(
+ context.Background(),
+ admin.WithTopicCreate("newTopic"),
+ admin.WithBrokerAddrCreate("127.0.0.1:10911"),
+)
+```
+
+- delete topic
+`ClusterName` not supported yet
+```
+err = testAdmin.DeleteTopic(
+ context.Background(),
+ admin.WithTopicDelete("newTopic"),
+ //admin.WithBrokerAddrDelete("127.0.0.1:10911"), //optional
+ //admin.WithNameSrvAddr(nameSrvAddr), //optional
+)
+```
\ No newline at end of file
diff --git a/docs/feature.md b/docs/feature.md
index 795ffac..eed3e17 100644
--- a/docs/feature.md
+++ b/docs/feature.md
@@ -70,4 +70,17 @@
- [ ] Other
- [ ] VIPChannel
- [ ] RPCHook
-
\ No newline at end of file
+
+## Admin
+
+### Topic/Cluster
+- [x] updateTopic
+- [x] deleteTopic
+- [ ] updateSubGroup
+- [ ] deleteSubGroup
+- [ ] updateBrokerConfig
+- [ ] updateTopicPerm
+- [ ] listTopic
+- [ ] topicRoute
+- [ ] topicStatus
+- [ ] topicClusterList
\ No newline at end of file
diff --git a/examples/admin/topic/main.go b/examples/admin/topic/main.go
new file mode 100644
index 0000000..ef9a536
--- /dev/null
+++ b/examples/admin/topic/main.go
@@ -0,0 +1,64 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/apache/rocketmq-client-go/v2/admin"
+ "github.com/apache/rocketmq-client-go/v2/primitive"
+)
+
+func main() {
+ topic := "newOne"
+ //clusterName := "DefaultCluster"
+ nameSrvAddr := []string{"127.0.0.1:9876"}
+ brokerAddr := "127.0.0.1:10911"
+
+ testAdmin, err := admin.NewAdmin(admin.WithResolver(primitive.NewPassthroughResolver(nameSrvAddr)))
+ if err != nil {
+ fmt.Println(err.Error())
+ }
+
+ //create topic
+ err = testAdmin.CreateTopic(
+ context.Background(),
+ admin.WithTopicCreate(topic),
+ admin.WithBrokerAddrCreate(brokerAddr),
+ )
+ if err != nil {
+ fmt.Println("Create topic error:", err.Error())
+ }
+
+ //deletetopic
+ err = testAdmin.DeleteTopic(
+ context.Background(),
+ admin.WithTopicDelete(topic),
+ //admin.WithBrokerAddrDelete(brokerAddr),
+ //admin.WithNameSrvAddr(nameSrvAddr),
+ )
+ if err != nil {
+ fmt.Println("Delete topic error:", err.Error())
+ }
+
+ err = testAdmin.Close()
+ if err != nil {
+ fmt.Printf("Shutdown admin error: %s", err.Error())
+ }
+}
diff --git a/internal/namesrv.go b/internal/namesrv.go
index 6d8ff2b..a47bbc1 100644
--- a/internal/namesrv.go
+++ b/internal/namesrv.go
@@ -120,14 +120,17 @@
s.lock.Lock()
defer s.lock.Unlock()
- addr := s.srvs[s.index]
+ addr := s.srvs[s.index%len(s.srvs)]
index := s.index + 1
if index < 0 {
index = -index
}
index %= len(s.srvs)
s.index = index
- return strings.TrimLeft(addr, "http(s)://")
+ if strings.HasPrefix(addr, "https") {
+ return strings.TrimPrefix(addr, "https://")
+ }
+ return strings.TrimPrefix(addr, "http://")
}
func (s *namesrvs) Size() int {
diff --git a/internal/request.go b/internal/request.go
index 5438790..fa88efe 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -24,25 +24,33 @@
)
const (
- ReqSendMessage = int16(10)
- ReqPullMessage = int16(11)
- ReqQueryConsumerOffset = int16(14)
- ReqUpdateConsumerOffset = int16(15)
- ReqSearchOffsetByTimestamp = int16(29)
- ReqGetMaxOffset = int16(30)
- ReqHeartBeat = int16(34)
- ReqConsumerSendMsgBack = int16(36)
- ReqENDTransaction = int16(37)
- ReqGetConsumerListByGroup = int16(38)
- ReqLockBatchMQ = int16(41)
- ReqUnlockBatchMQ = int16(42)
- ReqGetRouteInfoByTopic = int16(105)
- ReqSendBatchMessage = int16(320)
- ReqCheckTransactionState = int16(39)
- ReqNotifyConsumerIdsChanged = int16(40)
- ReqResetConsuemrOffset = int16(220)
- ReqGetConsumerRunningInfo = int16(307)
- ReqConsumeMessageDirectly = int16(309)
+ ReqSendMessage = int16(10)
+ ReqPullMessage = int16(11)
+ ReqQueryMessage = int16(12)
+ ReqQueryConsumerOffset = int16(14)
+ ReqUpdateConsumerOffset = int16(15)
+ ReqCreateTopic = int16(17)
+ ReqSearchOffsetByTimestamp = int16(29)
+ ReqGetMaxOffset = int16(30)
+ ReqGetMinOffset = int16(31)
+ ReqViewMessageByID = int16(33)
+ ReqHeartBeat = int16(34)
+ ReqConsumerSendMsgBack = int16(36)
+ ReqENDTransaction = int16(37)
+ ReqGetConsumerListByGroup = int16(38)
+ ReqLockBatchMQ = int16(41)
+ ReqUnlockBatchMQ = int16(42)
+ ReqGetRouteInfoByTopic = int16(105)
+ ReqGetBrokerClusterInfo = int16(106)
+ ReqSendBatchMessage = int16(320)
+ ReqCheckTransactionState = int16(39)
+ ReqNotifyConsumerIdsChanged = int16(40)
+ ReqGetAllTopicListFromNameServer = int16(206)
+ ReqDeleteTopicInBroker = int16(215)
+ ReqDeleteTopicInNameSrv = int16(216)
+ ReqResetConsuemrOffset = int16(220)
+ ReqGetConsumerRunningInfo = int16(307)
+ ReqConsumeMessageDirectly = int16(309)
)
type SendMessageRequestHeader struct {
@@ -318,3 +326,84 @@
request.clientID = v
}
}
+
+type QueryMessageRequestHeader struct {
+ Topic string
+ Key string
+ MaxNum int
+ BeginTimestamp int64
+ EndTimestamp int64
+}
+
+func (request *QueryMessageRequestHeader) Encode() map[string]string {
+ maps := make(map[string]string)
+ maps["topic"] = request.Topic
+ maps["key"] = request.Key
+ maps["maxNum"] = fmt.Sprintf("%d", request.MaxNum)
+ maps["beginTimestamp"] = strconv.FormatInt(request.BeginTimestamp, 10)
+ maps["endTimestamp"] = fmt.Sprintf("%d", request.EndTimestamp)
+
+ return maps
+}
+
+func (request *QueryMessageRequestHeader) Decode(properties map[string]string) error {
+ return nil
+}
+
+type ViewMessageRequestHeader struct {
+ Offset int64
+}
+
+func (request *ViewMessageRequestHeader) Encode() map[string]string {
+ maps := make(map[string]string)
+ maps["offset"] = strconv.FormatInt(request.Offset, 10)
+
+ return maps
+}
+
+type CreateTopicRequestHeader struct {
+ Topic string
+ DefaultTopic string
+ ReadQueueNums int
+ WriteQueueNums int
+ Perm int
+ TopicFilterType string
+ TopicSysFlag int
+ Order bool
+}
+
+func (request *CreateTopicRequestHeader) Encode() map[string]string {
+ maps := make(map[string]string)
+ maps["topic"] = request.Topic
+ maps["defaultTopic"] = request.DefaultTopic
+ maps["readQueueNums"] = fmt.Sprintf("%d", request.ReadQueueNums)
+ maps["writeQueueNums"] = fmt.Sprintf("%d", request.WriteQueueNums)
+ maps["perm"] = fmt.Sprintf("%d", request.Perm)
+ maps["topicFilterType"] = request.TopicFilterType
+ maps["topicSysFlag"] = fmt.Sprintf("%d", request.TopicSysFlag)
+ maps["order"] = strconv.FormatBool(request.Order)
+
+ return maps
+}
+
+type TopicListRequestHeader struct {
+ Topic string
+}
+
+func (request *TopicListRequestHeader) Encode() map[string]string {
+ maps := make(map[string]string)
+ maps["topic"] = request.Topic
+
+ return maps
+}
+
+type DeleteTopicRequestHeader struct {
+ Topic string
+}
+
+func (request *DeleteTopicRequestHeader) Encode() map[string]string {
+ maps := make(map[string]string)
+ maps["topic"] = request.Topic
+
+ return maps
+}
diff --git a/rlog/log.go b/rlog/log.go
index 444ec3f..3e3dfd3 100644
--- a/rlog/log.go
+++ b/rlog/log.go
@@ -42,6 +42,7 @@
Error(msg string, fields map[string]interface{})
Fatal(msg string, fields map[string]interface{})
Level(level string)
+ OutputPath(path string) (err error)
}
func init() {
@@ -102,6 +103,7 @@
}
l.logger.WithFields(fields).Fatal(msg)
}
+
func (l *defaultLogger) Level(level string) {
switch strings.ToLower(level) {
case "debug":
@@ -115,6 +117,17 @@
}
}
+func (l *defaultLogger) OutputPath(path string) (err error) {
+ var file *os.File
+ file, err = os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
+ if err != nil {
+ return
+ }
+
+ l.logger.Out = file
+ return
+}
+
// SetLogger use specified logger user customized, in general, we suggest user to replace the default logger with specified
func SetLogger(logger Logger) {
rLog = logger
@@ -126,6 +139,14 @@
rLog.Level(level)
}
+func SetOutputPath(path string) (err error) {
+ if "" == path {
+ return
+ }
+
+ return rLog.OutputPath(path)
+}
+
func Debug(msg string, fields map[string]interface{}) {
rLog.Debug(msg, fields)
}