blob: f45f39aee8b442a41d3e2680540ee3b72d3c9675 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package admin
import (
"context"
"sync"
"time"
"github.com/apache/rocketmq-client-go/v2/internal"
"github.com/apache/rocketmq-client-go/v2/internal/remote"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/rlog"
)
type Admin interface {
CreateTopic(ctx context.Context, opts ...OptionCreate) error
DeleteTopic(ctx context.Context, opts ...OptionDelete) error
//TODO
//TopicList(ctx context.Context, mq *primitive.MessageQueue) (*remote.RemotingCommand, error)
//GetBrokerClusterInfo(ctx context.Context) (*remote.RemotingCommand, error)
Close() error
}
// TODO: move outdated context to ctx
type adminOptions struct {
internal.ClientOptions
}
type AdminOption func(options *adminOptions)
func defaultAdminOptions() *adminOptions {
opts := &adminOptions{
ClientOptions: internal.DefaultClientOptions(),
}
opts.GroupName = "TOOLS_ADMIN"
opts.InstanceName = time.Now().String()
return opts
}
// WithResolver nameserver resolver to fetch nameserver addr
func WithResolver(resolver primitive.NsResolver) AdminOption {
return func(options *adminOptions) {
options.Resolver = resolver
}
}
type admin struct {
cli internal.RMQClient
namesrv internal.Namesrvs
opts *adminOptions
closeOnce sync.Once
}
// NewAdmin initialize admin
func NewAdmin(opts ...AdminOption) (Admin, error) {
defaultOpts := defaultAdminOptions()
for _, opt := range opts {
opt(defaultOpts)
}
cli := internal.GetOrNewRocketMQClient(defaultOpts.ClientOptions, nil)
namesrv, err := internal.NewNamesrv(defaultOpts.Resolver)
if err != nil {
return nil, err
}
//log.Printf("Client: %#v", namesrv.srvs)
return &admin{
cli: cli,
namesrv: namesrv,
opts: defaultOpts,
}, nil
}
// CreateTopic create topic.
// TODO: another implementation like sarama, without brokerAddr as input
func (a *admin) CreateTopic(ctx context.Context, opts ...OptionCreate) error {
cfg := defaultTopicConfigCreate()
for _, apply := range opts {
apply(&cfg)
}
request := &internal.CreateTopicRequestHeader{
Topic: cfg.Topic,
DefaultTopic: cfg.DefaultTopic,
ReadQueueNums: cfg.ReadQueueNums,
WriteQueueNums: cfg.WriteQueueNums,
Perm: cfg.Perm,
TopicFilterType: cfg.TopicFilterType,
TopicSysFlag: cfg.TopicSysFlag,
Order: cfg.Order,
}
cmd := remote.NewRemotingCommand(internal.ReqCreateTopic, request, nil)
_, err := a.cli.InvokeSync(ctx, cfg.BrokerAddr, cmd, 5*time.Second)
if err != nil {
rlog.Error("create topic error", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: cfg.BrokerAddr,
rlog.LogKeyUnderlayError: err,
})
} else {
rlog.Info("create topic success", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: cfg.BrokerAddr,
})
}
return err
}
// DeleteTopicInBroker delete topic in broker.
func (a *admin) deleteTopicInBroker(ctx context.Context, topic string, brokerAddr string) (*remote.RemotingCommand, error) {
request := &internal.DeleteTopicRequestHeader{
Topic: topic,
}
cmd := remote.NewRemotingCommand(internal.ReqDeleteTopicInBroker, request, nil)
return a.cli.InvokeSync(ctx, brokerAddr, cmd, 5*time.Second)
}
// DeleteTopicInNameServer delete topic in nameserver.
func (a *admin) deleteTopicInNameServer(ctx context.Context, topic string, nameSrvAddr string) (*remote.RemotingCommand, error) {
request := &internal.DeleteTopicRequestHeader{
Topic: topic,
}
cmd := remote.NewRemotingCommand(internal.ReqDeleteTopicInNameSrv, request, nil)
return a.cli.InvokeSync(ctx, nameSrvAddr, cmd, 5*time.Second)
}
// DeleteTopic delete topic in both broker and nameserver.
func (a *admin) DeleteTopic(ctx context.Context, opts ...OptionDelete) error {
cfg := defaultTopicConfigDelete()
for _, apply := range opts {
apply(&cfg)
}
//delete topic in broker
if cfg.BrokerAddr == "" {
a.namesrv.UpdateTopicRouteInfo(cfg.Topic)
cfg.BrokerAddr = a.namesrv.FindBrokerAddrByTopic(cfg.Topic)
}
if _, err := a.deleteTopicInBroker(ctx, cfg.Topic, cfg.BrokerAddr); err != nil {
if err != nil {
rlog.Error("delete topic in broker error", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: cfg.BrokerAddr,
rlog.LogKeyUnderlayError: err,
})
}
return err
}
//delete topic in nameserver
if len(cfg.NameSrvAddr) == 0 {
a.namesrv.UpdateTopicRouteInfo(cfg.Topic)
cfg.NameSrvAddr = a.namesrv.AddrList()
}
for _, nameSrvAddr := range cfg.NameSrvAddr {
if _, err := a.deleteTopicInNameServer(ctx, cfg.Topic, nameSrvAddr); err != nil {
if err != nil {
rlog.Error("delete topic in name server error", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
"nameServer": nameSrvAddr,
rlog.LogKeyUnderlayError: err,
})
}
return err
}
}
rlog.Info("delete topic success", map[string]interface{}{
rlog.LogKeyTopic: cfg.Topic,
rlog.LogKeyBroker: cfg.BrokerAddr,
"nameServer": cfg.NameSrvAddr,
})
return nil
}
func (a *admin) Close() error {
a.closeOnce.Do(func() {
a.cli.Shutdown()
})
return nil
}