Merge pull request #554 from lampnick/master

modified typo word Resovler to Resolver
diff --git a/admin/admin.go b/admin/admin.go
new file mode 100644
index 0000000..f45f39a
--- /dev/null
+++ b/admin/admin.go
@@ -0,0 +1,202 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package admin
+
+import (
+	"context"
+	"sync"
+	"time"
+
+	"github.com/apache/rocketmq-client-go/v2/internal"
+	"github.com/apache/rocketmq-client-go/v2/internal/remote"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+	"github.com/apache/rocketmq-client-go/v2/rlog"
+)
+
+type Admin interface {
+	CreateTopic(ctx context.Context, opts ...OptionCreate) error
+	DeleteTopic(ctx context.Context, opts ...OptionDelete) error
+	//TODO
+	//TopicList(ctx context.Context, mq *primitive.MessageQueue) (*remote.RemotingCommand, error)
+	//GetBrokerClusterInfo(ctx context.Context) (*remote.RemotingCommand, error)
+	Close() error
+}
+
+// TODO: move outdated context to ctx
+type adminOptions struct {
+	internal.ClientOptions
+}
+
+type AdminOption func(options *adminOptions)
+
+func defaultAdminOptions() *adminOptions {
+	opts := &adminOptions{
+		ClientOptions: internal.DefaultClientOptions(),
+	}
+	opts.GroupName = "TOOLS_ADMIN"
+	opts.InstanceName = time.Now().String()
+	return opts
+}
+
+// WithResolver nameserver resolver to fetch nameserver addr
+func WithResolver(resolver primitive.NsResolver) AdminOption {
+	return func(options *adminOptions) {
+		options.Resolver = resolver
+	}
+}
+
+type admin struct {
+	cli     internal.RMQClient
+	namesrv internal.Namesrvs
+
+	opts *adminOptions
+
+	closeOnce sync.Once
+}
+
+// NewAdmin initialize admin
+func NewAdmin(opts ...AdminOption) (Admin, error) {
+	defaultOpts := defaultAdminOptions()
+	for _, opt := range opts {
+		opt(defaultOpts)
+	}
+
+	cli := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
+	namesrv, err := internal.NewNamesrv(defaultOpts.Resolver)
+	if err != nil {
+		return nil, err
+	}
+	//log.Printf("Client: %#v", namesrv.srvs)
+	return &admin{
+		cli:     cli,
+		namesrv: namesrv,
+		opts:    defaultOpts,
+	}, nil
+}
+
+// CreateTopic create topic.
+// TODO: another implementation like sarama, without brokerAddr as input
+func (a *admin) CreateTopic(ctx context.Context, opts ...OptionCreate) error {
+	cfg := defaultTopicConfigCreate()
+	for _, apply := range opts {
+		apply(&cfg)
+	}
+
+	request := &internal.CreateTopicRequestHeader{
+		Topic:           cfg.Topic,
+		DefaultTopic:    cfg.DefaultTopic,
+		ReadQueueNums:   cfg.ReadQueueNums,
+		WriteQueueNums:  cfg.WriteQueueNums,
+		Perm:            cfg.Perm,
+		TopicFilterType: cfg.TopicFilterType,
+		TopicSysFlag:    cfg.TopicSysFlag,
+		Order:           cfg.Order,
+	}
+
+	cmd := remote.NewRemotingCommand(internal.ReqCreateTopic, request, nil)
+	_, err := a.cli.InvokeSync(ctx, cfg.BrokerAddr, cmd, 5*time.Second)
+	if err != nil {
+		rlog.Error("create topic error", map[string]interface{}{
+			rlog.LogKeyTopic:         cfg.Topic,
+			rlog.LogKeyBroker:        cfg.BrokerAddr,
+			rlog.LogKeyUnderlayError: err,
+		})
+	} else {
+		rlog.Info("create topic success", map[string]interface{}{
+			rlog.LogKeyTopic:  cfg.Topic,
+			rlog.LogKeyBroker: cfg.BrokerAddr,
+		})
+	}
+	return err
+}
+
+// DeleteTopicInBroker delete topic in broker.
+func (a *admin) deleteTopicInBroker(ctx context.Context, topic string, brokerAddr string) (*remote.RemotingCommand, error) {
+	request := &internal.DeleteTopicRequestHeader{
+		Topic: topic,
+	}
+
+	cmd := remote.NewRemotingCommand(internal.ReqDeleteTopicInBroker, request, nil)
+	return a.cli.InvokeSync(ctx, brokerAddr, cmd, 5*time.Second)
+}
+
+// DeleteTopicInNameServer delete topic in nameserver.
+func (a *admin) deleteTopicInNameServer(ctx context.Context, topic string, nameSrvAddr string) (*remote.RemotingCommand, error) {
+	request := &internal.DeleteTopicRequestHeader{
+		Topic: topic,
+	}
+
+	cmd := remote.NewRemotingCommand(internal.ReqDeleteTopicInNameSrv, request, nil)
+	return a.cli.InvokeSync(ctx, nameSrvAddr, cmd, 5*time.Second)
+}
+
+// DeleteTopic delete topic in both broker and nameserver.
+func (a *admin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error {
+	cfg := defaultTopicConfigDelete()
+	for _, apply := range opts {
+		apply(&cfg)
+	}
+	//delete topic in broker
+	if cfg.BrokerAddr == "" {
+		a.namesrv.UpdateTopicRouteInfo(cfg.Topic)
+		cfg.BrokerAddr = a.namesrv.FindBrokerAddrByTopic(cfg.Topic)
+	}
+
+	if _, err := a.deleteTopicInBroker(ctx, cfg.Topic, cfg.BrokerAddr); err != nil {
+		if err != nil {
+			rlog.Error("delete topic in broker error", map[string]interface{}{
+				rlog.LogKeyTopic:         cfg.Topic,
+				rlog.LogKeyBroker:        cfg.BrokerAddr,
+				rlog.LogKeyUnderlayError: err,
+			})
+		}
+		return err
+	}
+
+	//delete topic in nameserver
+	if len(cfg.NameSrvAddr) == 0 {
+		a.namesrv.UpdateTopicRouteInfo(cfg.Topic)
+		cfg.NameSrvAddr = a.namesrv.AddrList()
+	}
+
+	for _, nameSrvAddr := range cfg.NameSrvAddr {
+		if _, err := a.deleteTopicInNameServer(ctx, cfg.Topic, nameSrvAddr); err != nil {
+			if err != nil {
+				rlog.Error("delete topic in name server error", map[string]interface{}{
+					rlog.LogKeyTopic:         cfg.Topic,
+					"nameServer":             nameSrvAddr,
+					rlog.LogKeyUnderlayError: err,
+				})
+			}
+			return err
+		}
+	}
+	rlog.Info("delete topic success", map[string]interface{}{
+		rlog.LogKeyTopic:  cfg.Topic,
+		rlog.LogKeyBroker: cfg.BrokerAddr,
+		"nameServer":      cfg.NameSrvAddr,
+	})
+	return nil
+}
+
+func (a *admin) Close() error {
+	a.closeOnce.Do(func() {
+		a.cli.Shutdown()
+	})
+	return nil
+}
diff --git a/admin/option.go b/admin/option.go
new file mode 100644
index 0000000..d5a648e
--- /dev/null
+++ b/admin/option.go
@@ -0,0 +1,131 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package admin
+
+func defaultTopicConfigCreate() TopicConfigCreate {
+	opts := TopicConfigCreate{
+		DefaultTopic:    "defaultTopic",
+		ReadQueueNums:   8,
+		WriteQueueNums:  8,
+		Perm:            6,
+		TopicFilterType: "SINGLE_TAG",
+		TopicSysFlag:    0,
+		Order:           false,
+	}
+	return opts
+}
+
+type TopicConfigCreate struct {
+	Topic           string
+	BrokerAddr      string
+	DefaultTopic    string
+	ReadQueueNums   int
+	WriteQueueNums  int
+	Perm            int
+	TopicFilterType string
+	TopicSysFlag    int
+	Order           bool
+}
+
+type OptionCreate func(*TopicConfigCreate)
+
+func WithTopicCreate(Topic string) OptionCreate {
+	return func(opts *TopicConfigCreate) {
+		opts.Topic = Topic
+	}
+}
+
+func WithBrokerAddrCreate(BrokerAddr string) OptionCreate {
+	return func(opts *TopicConfigCreate) {
+		opts.BrokerAddr = BrokerAddr
+	}
+}
+
+func WithReadQueueNums(ReadQueueNums int) OptionCreate {
+	return func(opts *TopicConfigCreate) {
+		opts.ReadQueueNums = ReadQueueNums
+	}
+}
+
+func WithWriteQueueNums(WriteQueueNums int) OptionCreate {
+	return func(opts *TopicConfigCreate) {
+		opts.WriteQueueNums = WriteQueueNums
+	}
+}
+
+func WithPerm(Perm int) OptionCreate {
+	return func(opts *TopicConfigCreate) {
+		opts.Perm = Perm
+	}
+}
+
+func WithTopicFilterType(TopicFilterType string) OptionCreate {
+	return func(opts *TopicConfigCreate) {
+		opts.TopicFilterType = TopicFilterType
+	}
+}
+
+func WithTopicSysFlag(TopicSysFlag int) OptionCreate {
+	return func(opts *TopicConfigCreate) {
+		opts.TopicSysFlag = TopicSysFlag
+	}
+}
+
+func WithOrder(Order bool) OptionCreate {
+	return func(opts *TopicConfigCreate) {
+		opts.Order = Order
+	}
+}
+
+func defaultTopicConfigDelete() TopicConfigDelete {
+	opts := TopicConfigDelete{}
+	return opts
+}
+
+type TopicConfigDelete struct {
+	Topic       string
+	ClusterName string
+	NameSrvAddr []string
+	BrokerAddr  string
+}
+
+type OptionDelete func(*TopicConfigDelete)
+
+func WithTopicDelete(Topic string) OptionDelete {
+	return func(opts *TopicConfigDelete) {
+		opts.Topic = Topic
+	}
+}
+
+func WithBrokerAddrDelete(BrokerAddr string) OptionDelete {
+	return func(opts *TopicConfigDelete) {
+		opts.BrokerAddr = BrokerAddr
+	}
+}
+
+func WithClusterName(ClusterName string) OptionDelete {
+	return func(opts *TopicConfigDelete) {
+		opts.ClusterName = ClusterName
+	}
+}
+
+func WithNameSrvAddr(NameSrvAddr []string) OptionDelete {
+	return func(opts *TopicConfigDelete) {
+		opts.NameSrvAddr = NameSrvAddr
+	}
+}
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 3129b8d..3504786 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -754,7 +754,7 @@
 		case ConsumeFromLastOffset:
 			if lastOffset == -1 {
 				if strings.HasPrefix(mq.Topic, internal.RetryGroupTopicPrefix) {
-					lastOffset = 0
+					result = 0
 				} else {
 					lastOffset, err := dc.queryMaxOffset(mq)
 					if err == nil {
@@ -929,7 +929,7 @@
 	brokerAddr := dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
 	if brokerAddr == "" {
 		dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
-		brokerAddr = dc.namesrv.FindBrokerAddrByName(mq.Topic)
+		brokerAddr = dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
 	}
 	if brokerAddr == "" {
 		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
@@ -958,7 +958,7 @@
 	brokerAddr := dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
 	if brokerAddr == "" {
 		dc.namesrv.UpdateTopicRouteInfo(mq.Topic)
-		brokerAddr = dc.namesrv.FindBrokerAddrByName(mq.Topic)
+		brokerAddr = dc.namesrv.FindBrokerAddrByName(mq.BrokerName)
 	}
 	if brokerAddr == "" {
 		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 0523f08..3a54194 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -81,6 +81,7 @@
 		return nil, errors.Wrap(err, "new Namesrv failed.")
 	}
 
+	defaultOpts.Namesrv = srvs
 	dc := &defaultConsumer{
 		client:        internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil),
 		consumerGroup: defaultOpts.GroupName,
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index d2aee5b..58945c2 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -705,7 +705,7 @@
 			time.Sleep(10 * time.Second)
 			pc.storage.update(request.mq, request.nextOffset, false)
 			pc.storage.persist([]*primitive.MessageQueue{request.mq})
-			pc.processQueueTable.Delete(request.mq)
+			pc.processQueueTable.Delete(*request.mq)
 			rlog.Warning(fmt.Sprintf("fix the pull request offset: %s", request.String()), nil)
 		default:
 			rlog.Warning(fmt.Sprintf("unknown pull status: %v", result.Status), nil)
@@ -913,7 +913,7 @@
 				Properties:    make(map[string]string),
 				ConsumerGroup: pc.consumerGroup,
 				MQ:            mq,
-				Msgs:          msgs,
+				Msgs:          subMsgs,
 			}
 			ctx := context.Background()
 			ctx = primitive.WithConsumerCtx(ctx, msgCtx)
@@ -944,14 +944,14 @@
 				} else {
 					increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
 					if pc.model == BroadCasting {
-						for i := 0; i < len(msgs); i++ {
+						for i := 0; i < len(subMsgs); i++ {
 							rlog.Warning("BROADCASTING, the message consume failed, drop it", map[string]interface{}{
 								"message": subMsgs[i],
 							})
 						}
 					} else {
-						for i := 0; i < len(msgs); i++ {
-							msg := msgs[i]
+						for i := 0; i < len(subMsgs); i++ {
+							msg := subMsgs[i]
 							if !pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) {
 								msg.ReconsumeTimes += 1
 								msgBackFailed = append(msgBackFailed, msg)
@@ -973,7 +973,7 @@
 			} else {
 				rlog.Warning("processQueue is dropped without process consume result.", map[string]interface{}{
 					rlog.LogKeyMessageQueue: mq,
-					"message":               msgs,
+					"message":               subMsgs,
 				})
 			}
 		})
diff --git a/docs/Introduction.md b/docs/Introduction.md
index dd86c90..a4e954e 100644
--- a/docs/Introduction.md
+++ b/docs/Introduction.md
@@ -101,3 +101,28 @@
 ```
 
 Full examples: [consumer](../examples/consumer)
+
+
+### Admin: Topic Operation
+
+#### Examples
+- create topic
+```
+testAdmin, err := admin.NewAdmin(admin.WithResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})))
+err = testAdmin.CreateTopic(
+	context.Background(),
+	admin.WithTopicCreate("newTopic"),
+	admin.WithBrokerAddrCreate("127.0.0.1:10911"),
+)
+```
+
+- delete topic
+`ClusterName` not supported yet
+```
+err = testAdmin.DeleteTopic(
+	context.Background(),
+	admin.WithTopicDelete("newTopic"),
+	//admin.WithBrokerAddrDelete("127.0.0.1:10911"),	//optional
+	//admin.WithNameSrvAddr(nameSrvAddr),				//optional
+)
+```
\ No newline at end of file
diff --git a/docs/feature.md b/docs/feature.md
index 795ffac..eed3e17 100644
--- a/docs/feature.md
+++ b/docs/feature.md
@@ -70,4 +70,17 @@
 - [ ] Other
     - [ ] VIPChannel
     - [ ] RPCHook
-    
\ No newline at end of file
+    
+## Admin
+
+### Topic/Cluster
+- [x] updateTopic
+- [x] deleteTopic
+- [ ] updateSubGroup
+- [ ] deleteSubGroup
+- [ ] updateBrokerConfig
+- [ ] updateTopicPerm
+- [ ] listTopic
+- [ ] topicRoute
+- [ ] topicStatus
+- [ ] topicClusterList
\ No newline at end of file
diff --git a/examples/admin/topic/main.go b/examples/admin/topic/main.go
new file mode 100644
index 0000000..ef9a536
--- /dev/null
+++ b/examples/admin/topic/main.go
@@ -0,0 +1,64 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+	"context"
+	"fmt"
+
+	"github.com/apache/rocketmq-client-go/v2/admin"
+	"github.com/apache/rocketmq-client-go/v2/primitive"
+)
+
+func main() {
+	topic := "newOne"
+	//clusterName := "DefaultCluster"
+	nameSrvAddr := []string{"127.0.0.1:9876"}
+	brokerAddr := "127.0.0.1:10911"
+
+	testAdmin, err := admin.NewAdmin(admin.WithResolver(primitive.NewPassthroughResolver(nameSrvAddr)))
+	if err != nil {
+		fmt.Println(err.Error())
+	}
+
+	//create topic
+	err = testAdmin.CreateTopic(
+		context.Background(),
+		admin.WithTopicCreate(topic),
+		admin.WithBrokerAddrCreate(brokerAddr),
+	)
+	if err != nil {
+		fmt.Println("Create topic error:", err.Error())
+	}
+
+	//deletetopic
+	err = testAdmin.DeleteTopic(
+		context.Background(),
+		admin.WithTopicDelete(topic),
+		//admin.WithBrokerAddrDelete(brokerAddr),
+		//admin.WithNameSrvAddr(nameSrvAddr),
+	)
+	if err != nil {
+		fmt.Println("Delete topic error:", err.Error())
+	}
+
+	err = testAdmin.Close()
+	if err != nil {
+		fmt.Printf("Shutdown admin error: %s", err.Error())
+	}
+}
diff --git a/internal/namesrv.go b/internal/namesrv.go
index 6d8ff2b..a47bbc1 100644
--- a/internal/namesrv.go
+++ b/internal/namesrv.go
@@ -120,14 +120,17 @@
 	s.lock.Lock()
 	defer s.lock.Unlock()
 
-	addr := s.srvs[s.index]
+	addr := s.srvs[s.index%len(s.srvs)]
 	index := s.index + 1
 	if index < 0 {
 		index = -index
 	}
 	index %= len(s.srvs)
 	s.index = index
-	return strings.TrimLeft(addr, "http(s)://")
+	if strings.HasPrefix(addr, "https") {
+		return strings.TrimPrefix(addr, "https://")
+	}
+	return strings.TrimPrefix(addr, "http://")
 }
 
 func (s *namesrvs) Size() int {
diff --git a/internal/request.go b/internal/request.go
index 5438790..fa88efe 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -24,25 +24,33 @@
 )
 
 const (
-	ReqSendMessage              = int16(10)
-	ReqPullMessage              = int16(11)
-	ReqQueryConsumerOffset      = int16(14)
-	ReqUpdateConsumerOffset     = int16(15)
-	ReqSearchOffsetByTimestamp  = int16(29)
-	ReqGetMaxOffset             = int16(30)
-	ReqHeartBeat                = int16(34)
-	ReqConsumerSendMsgBack      = int16(36)
-	ReqENDTransaction           = int16(37)
-	ReqGetConsumerListByGroup   = int16(38)
-	ReqLockBatchMQ              = int16(41)
-	ReqUnlockBatchMQ            = int16(42)
-	ReqGetRouteInfoByTopic      = int16(105)
-	ReqSendBatchMessage         = int16(320)
-	ReqCheckTransactionState    = int16(39)
-	ReqNotifyConsumerIdsChanged = int16(40)
-	ReqResetConsuemrOffset      = int16(220)
-	ReqGetConsumerRunningInfo   = int16(307)
-	ReqConsumeMessageDirectly   = int16(309)
+	ReqSendMessage                   = int16(10)
+	ReqPullMessage                   = int16(11)
+	ReqQueryMessage                  = int16(12)
+	ReqQueryConsumerOffset           = int16(14)
+	ReqUpdateConsumerOffset          = int16(15)
+	ReqCreateTopic                   = int16(17)
+	ReqSearchOffsetByTimestamp       = int16(29)
+	ReqGetMaxOffset                  = int16(30)
+	ReqGetMinOffset                  = int16(31)
+	ReqViewMessageByID               = int16(33)
+	ReqHeartBeat                     = int16(34)
+	ReqConsumerSendMsgBack           = int16(36)
+	ReqENDTransaction                = int16(37)
+	ReqGetConsumerListByGroup        = int16(38)
+	ReqLockBatchMQ                   = int16(41)
+	ReqUnlockBatchMQ                 = int16(42)
+	ReqGetRouteInfoByTopic           = int16(105)
+	ReqGetBrokerClusterInfo          = int16(106)
+	ReqSendBatchMessage              = int16(320)
+	ReqCheckTransactionState         = int16(39)
+	ReqNotifyConsumerIdsChanged      = int16(40)
+	ReqGetAllTopicListFromNameServer = int16(206)
+	ReqDeleteTopicInBroker           = int16(215)
+	ReqDeleteTopicInNameSrv          = int16(216)
+	ReqResetConsuemrOffset           = int16(220)
+	ReqGetConsumerRunningInfo        = int16(307)
+	ReqConsumeMessageDirectly        = int16(309)
 )
 
 type SendMessageRequestHeader struct {
@@ -318,3 +326,84 @@
 		request.clientID = v
 	}
 }
+
+type QueryMessageRequestHeader struct {
+	Topic          string
+	Key            string
+	MaxNum         int
+	BeginTimestamp int64
+	EndTimestamp   int64
+}
+
+func (request *QueryMessageRequestHeader) Encode() map[string]string {
+	maps := make(map[string]string)
+	maps["topic"] = request.Topic
+	maps["key"] = request.Key
+	maps["maxNum"] = fmt.Sprintf("%d", request.MaxNum)
+	maps["beginTimestamp"] = strconv.FormatInt(request.BeginTimestamp, 10)
+	maps["endTimestamp"] = fmt.Sprintf("%d", request.EndTimestamp)
+
+	return maps
+}
+
+func (request *QueryMessageRequestHeader) Decode(properties map[string]string) error {
+	return nil
+}
+
+type ViewMessageRequestHeader struct {
+	Offset int64
+}
+
+func (request *ViewMessageRequestHeader) Encode() map[string]string {
+	maps := make(map[string]string)
+	maps["offset"] = strconv.FormatInt(request.Offset, 10)
+
+	return maps
+}
+
+type CreateTopicRequestHeader struct {
+	Topic           string
+	DefaultTopic    string
+	ReadQueueNums   int
+	WriteQueueNums  int
+	Perm            int
+	TopicFilterType string
+	TopicSysFlag    int
+	Order           bool
+}
+
+func (request *CreateTopicRequestHeader) Encode() map[string]string {
+	maps := make(map[string]string)
+	maps["topic"] = request.Topic
+	maps["defaultTopic"] = request.DefaultTopic
+	maps["readQueueNums"] = fmt.Sprintf("%d", request.ReadQueueNums)
+	maps["writeQueueNums"] = fmt.Sprintf("%d", request.WriteQueueNums)
+	maps["perm"] = fmt.Sprintf("%d", request.Perm)
+	maps["topicFilterType"] = request.TopicFilterType
+	maps["topicSysFlag"] = fmt.Sprintf("%d", request.TopicSysFlag)
+	maps["order"] = strconv.FormatBool(request.Order)
+
+	return maps
+}
+
+type TopicListRequestHeader struct {
+	Topic string
+}
+
+func (request *TopicListRequestHeader) Encode() map[string]string {
+	maps := make(map[string]string)
+	maps["topic"] = request.Topic
+
+	return maps
+}
+
+type DeleteTopicRequestHeader struct {
+	Topic string
+}
+
+func (request *DeleteTopicRequestHeader) Encode() map[string]string {
+	maps := make(map[string]string)
+	maps["topic"] = request.Topic
+
+	return maps
+}
diff --git a/rlog/log.go b/rlog/log.go
index 444ec3f..3e3dfd3 100644
--- a/rlog/log.go
+++ b/rlog/log.go
@@ -42,6 +42,7 @@
 	Error(msg string, fields map[string]interface{})
 	Fatal(msg string, fields map[string]interface{})
 	Level(level string)
+	OutputPath(path string) (err error)
 }
 
 func init() {
@@ -102,6 +103,7 @@
 	}
 	l.logger.WithFields(fields).Fatal(msg)
 }
+
 func (l *defaultLogger) Level(level string) {
 	switch strings.ToLower(level) {
 	case "debug":
@@ -115,6 +117,17 @@
 	}
 }
 
+func (l *defaultLogger) OutputPath(path string) (err error) {
+	var file *os.File
+	file, err = os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
+	if err != nil {
+		return
+	}
+
+	l.logger.Out = file
+	return
+}
+
 // SetLogger use specified logger user customized, in general, we suggest user to replace the default logger with specified
 func SetLogger(logger Logger) {
 	rLog = logger
@@ -126,6 +139,14 @@
 	rLog.Level(level)
 }
 
+func SetOutputPath(path string) (err error) {
+	if "" == path {
+		return
+	}
+
+	return rLog.OutputPath(path)
+}
+
 func Debug(msg string, fields map[string]interface{}) {
 	rLog.Debug(msg, fields)
 }