| /* |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package admin |
| |
| import ( |
| "context" |
| "sync" |
| "time" |
| |
| "github.com/apache/rocketmq-client-go/v2/internal" |
| "github.com/apache/rocketmq-client-go/v2/internal/remote" |
| "github.com/apache/rocketmq-client-go/v2/primitive" |
| "github.com/apache/rocketmq-client-go/v2/rlog" |
| ) |
| |
| type Admin interface { |
| CreateTopic(ctx context.Context, opts ...OptionCreate) error |
| DeleteTopic(ctx context.Context, opts ...OptionDelete) error |
| //TODO |
| //TopicList(ctx context.Context, mq *primitive.MessageQueue) (*remote.RemotingCommand, error) |
| //GetBrokerClusterInfo(ctx context.Context) (*remote.RemotingCommand, error) |
| Close() error |
| } |
| |
| // TODO: move outdated context to ctx |
| type adminOptions struct { |
| internal.ClientOptions |
| } |
| |
| type AdminOption func(options *adminOptions) |
| |
| func defaultAdminOptions() *adminOptions { |
| opts := &adminOptions{ |
| ClientOptions: internal.DefaultClientOptions(), |
| } |
| opts.GroupName = "TOOLS_ADMIN" |
| opts.InstanceName = time.Now().String() |
| return opts |
| } |
| |
| // WithResolver nameserver resolver to fetch nameserver addr |
| func WithResolver(resolver primitive.NsResolver) AdminOption { |
| return func(options *adminOptions) { |
| options.Resolver = resolver |
| } |
| } |
| |
| type admin struct { |
| cli internal.RMQClient |
| namesrv internal.Namesrvs |
| |
| opts *adminOptions |
| |
| closeOnce sync.Once |
| } |
| |
| // NewAdmin initialize admin |
| func NewAdmin(opts ...AdminOption) (Admin, error) { |
| defaultOpts := defaultAdminOptions() |
| for _, opt := range opts { |
| opt(defaultOpts) |
| } |
| |
| cli := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil) |
| namesrv, err := internal.NewNamesrv(defaultOpts.Resolver) |
| if err != nil { |
| return nil, err |
| } |
| //log.Printf("Client: %#v", namesrv.srvs) |
| return &admin{ |
| cli: cli, |
| namesrv: namesrv, |
| opts: defaultOpts, |
| }, nil |
| } |
| |
| // CreateTopic create topic. |
| // TODO: another implementation like sarama, without brokerAddr as input |
| func (a *admin) CreateTopic(ctx context.Context, opts ...OptionCreate) error { |
| cfg := defaultTopicConfigCreate() |
| for _, apply := range opts { |
| apply(&cfg) |
| } |
| |
| request := &internal.CreateTopicRequestHeader{ |
| Topic: cfg.Topic, |
| DefaultTopic: cfg.DefaultTopic, |
| ReadQueueNums: cfg.ReadQueueNums, |
| WriteQueueNums: cfg.WriteQueueNums, |
| Perm: cfg.Perm, |
| TopicFilterType: cfg.TopicFilterType, |
| TopicSysFlag: cfg.TopicSysFlag, |
| Order: cfg.Order, |
| } |
| |
| cmd := remote.NewRemotingCommand(internal.ReqCreateTopic, request, nil) |
| _, err := a.cli.InvokeSync(ctx, cfg.BrokerAddr, cmd, 5*time.Second) |
| if err != nil { |
| rlog.Error("create topic error", map[string]interface{}{ |
| rlog.LogKeyTopic: cfg.Topic, |
| rlog.LogKeyBroker: cfg.BrokerAddr, |
| rlog.LogKeyUnderlayError: err, |
| }) |
| } else { |
| rlog.Info("create topic success", map[string]interface{}{ |
| rlog.LogKeyTopic: cfg.Topic, |
| rlog.LogKeyBroker: cfg.BrokerAddr, |
| }) |
| } |
| return err |
| } |
| |
| // DeleteTopicInBroker delete topic in broker. |
| func (a *admin) deleteTopicInBroker(ctx context.Context, topic string, brokerAddr string) (*remote.RemotingCommand, error) { |
| request := &internal.DeleteTopicRequestHeader{ |
| Topic: topic, |
| } |
| |
| cmd := remote.NewRemotingCommand(internal.ReqDeleteTopicInBroker, request, nil) |
| return a.cli.InvokeSync(ctx, brokerAddr, cmd, 5*time.Second) |
| } |
| |
| // DeleteTopicInNameServer delete topic in nameserver. |
| func (a *admin) deleteTopicInNameServer(ctx context.Context, topic string, nameSrvAddr string) (*remote.RemotingCommand, error) { |
| request := &internal.DeleteTopicRequestHeader{ |
| Topic: topic, |
| } |
| |
| cmd := remote.NewRemotingCommand(internal.ReqDeleteTopicInNameSrv, request, nil) |
| return a.cli.InvokeSync(ctx, nameSrvAddr, cmd, 5*time.Second) |
| } |
| |
| // DeleteTopic delete topic in both broker and nameserver. |
| func (a *admin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error { |
| cfg := defaultTopicConfigDelete() |
| for _, apply := range opts { |
| apply(&cfg) |
| } |
| //delete topic in broker |
| if cfg.BrokerAddr == "" { |
| a.namesrv.UpdateTopicRouteInfo(cfg.Topic) |
| cfg.BrokerAddr = a.namesrv.FindBrokerAddrByTopic(cfg.Topic) |
| } |
| |
| if _, err := a.deleteTopicInBroker(ctx, cfg.Topic, cfg.BrokerAddr); err != nil { |
| if err != nil { |
| rlog.Error("delete topic in broker error", map[string]interface{}{ |
| rlog.LogKeyTopic: cfg.Topic, |
| rlog.LogKeyBroker: cfg.BrokerAddr, |
| rlog.LogKeyUnderlayError: err, |
| }) |
| } |
| return err |
| } |
| |
| //delete topic in nameserver |
| if len(cfg.NameSrvAddr) == 0 { |
| a.namesrv.UpdateTopicRouteInfo(cfg.Topic) |
| cfg.NameSrvAddr = a.namesrv.AddrList() |
| } |
| |
| for _, nameSrvAddr := range cfg.NameSrvAddr { |
| if _, err := a.deleteTopicInNameServer(ctx, cfg.Topic, nameSrvAddr); err != nil { |
| if err != nil { |
| rlog.Error("delete topic in name server error", map[string]interface{}{ |
| rlog.LogKeyTopic: cfg.Topic, |
| "nameServer": nameSrvAddr, |
| rlog.LogKeyUnderlayError: err, |
| }) |
| } |
| return err |
| } |
| } |
| rlog.Info("delete topic success", map[string]interface{}{ |
| rlog.LogKeyTopic: cfg.Topic, |
| rlog.LogKeyBroker: cfg.BrokerAddr, |
| "nameServer": cfg.NameSrvAddr, |
| }) |
| return nil |
| } |
| |
| func (a *admin) Close() error { |
| a.closeOnce.Do(func() { |
| a.cli.Shutdown() |
| }) |
| return nil |
| } |