blob: 93b7185eaddac8027fafde7aa79b875fe0ad3d6e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package admin
import (
"context"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
)
// Clusters is admin interface for clusters management
type Clusters interface {
// List returns the list of clusters
List() ([]string, error)
// ListWithContext returns the list of clusters
ListWithContext(context.Context) ([]string, error)
// Get the configuration data for the specified cluster
Get(string) (utils.ClusterData, error)
// GetWithContext returns the configuration data for the specified cluster
GetWithContext(context.Context, string) (utils.ClusterData, error)
// Create a new cluster
Create(utils.ClusterData) error
// CreateWithContext creates a new cluster
CreateWithContext(context.Context, utils.ClusterData) error
// Delete an existing cluster
Delete(string) error
// DeleteWithContext deletes an existing cluster
DeleteWithContext(context.Context, string) error
// Update the configuration for a cluster
Update(utils.ClusterData) error
// UpdateWithContext updates the configuration for a cluster
UpdateWithContext(context.Context, utils.ClusterData) error
// UpdatePeerClusters updates peer cluster names.
UpdatePeerClusters(string, []string) error
// UpdatePeerClustersWithContext updates peer cluster names.
UpdatePeerClustersWithContext(context.Context, string, []string) error
// GetPeerClusters returns peer-cluster names
GetPeerClusters(string) ([]string, error)
// GetPeerClusters returns peer-cluster names
GetPeerClustersWithContext(context.Context, string) ([]string, error)
// CreateFailureDomain creates a domain into cluster
CreateFailureDomain(utils.FailureDomainData) error
// CreateFailureDomain creates a domain into cluster
CreateFailureDomainWithContext(context.Context, utils.FailureDomainData) error
// GetFailureDomain returns the domain registered into a cluster
GetFailureDomain(clusterName, domainName string) (utils.FailureDomainData, error)
// GetFailureDomain returns the domain registered into a cluster
GetFailureDomainWithContext(ctx context.Context, clusterName, domainName string) (utils.FailureDomainData, error)
// ListFailureDomains returns all registered domains in cluster
ListFailureDomains(string) (utils.FailureDomainMap, error)
// ListFailureDomains returns all registered domains in cluster
ListFailureDomainsWithContext(context.Context, string) (utils.FailureDomainMap, error)
// DeleteFailureDomain deletes a domain in cluster
DeleteFailureDomain(utils.FailureDomainData) error
// DeleteFailureDomain deletes a domain in cluster
DeleteFailureDomainWithContext(context.Context, utils.FailureDomainData) error
// UpdateFailureDomain updates a domain into cluster
UpdateFailureDomain(utils.FailureDomainData) error
// UpdateFailureDomain updates a domain into cluster
UpdateFailureDomainWithContext(context.Context, utils.FailureDomainData) error
}
type clusters struct {
pulsar *pulsarClient
basePath string
}
// Clusters is used to access the cluster endpoints.
func (c *pulsarClient) Clusters() Clusters {
return &clusters{
pulsar: c,
basePath: "/clusters",
}
}
func (c *clusters) List() ([]string, error) {
return c.ListWithContext(context.Background())
}
func (c *clusters) ListWithContext(ctx context.Context) ([]string, error) {
var clusters []string
err := c.pulsar.Client.GetWithContext(ctx, c.pulsar.endpoint(c.basePath), &clusters)
return clusters, err
}
func (c *clusters) Get(name string) (utils.ClusterData, error) {
return c.GetWithContext(context.Background(), name)
}
func (c *clusters) GetWithContext(ctx context.Context, name string) (utils.ClusterData, error) {
cdata := utils.ClusterData{}
endpoint := c.pulsar.endpoint(c.basePath, name)
err := c.pulsar.Client.GetWithContext(ctx, endpoint, &cdata)
return cdata, err
}
func (c *clusters) Create(cdata utils.ClusterData) error {
return c.CreateWithContext(context.Background(), cdata)
}
func (c *clusters) CreateWithContext(ctx context.Context, cdata utils.ClusterData) error {
endpoint := c.pulsar.endpoint(c.basePath, cdata.Name)
return c.pulsar.Client.PutWithContext(ctx, endpoint, &cdata)
}
func (c *clusters) Delete(name string) error {
return c.DeleteWithContext(context.Background(), name)
}
func (c *clusters) DeleteWithContext(ctx context.Context, name string) error {
endpoint := c.pulsar.endpoint(c.basePath, name)
return c.pulsar.Client.DeleteWithContext(ctx, endpoint)
}
func (c *clusters) Update(cdata utils.ClusterData) error {
return c.UpdateWithContext(context.Background(), cdata)
}
func (c *clusters) UpdateWithContext(ctx context.Context, cdata utils.ClusterData) error {
endpoint := c.pulsar.endpoint(c.basePath, cdata.Name)
return c.pulsar.Client.PostWithContext(ctx, endpoint, &cdata)
}
func (c *clusters) GetPeerClusters(name string) ([]string, error) {
return c.GetPeerClustersWithContext(context.Background(), name)
}
func (c *clusters) GetPeerClustersWithContext(ctx context.Context, name string) ([]string, error) {
var peerClusters []string
endpoint := c.pulsar.endpoint(c.basePath, name, "peers")
err := c.pulsar.Client.GetWithContext(ctx, endpoint, &peerClusters)
return peerClusters, err
}
func (c *clusters) UpdatePeerClusters(cluster string, peerClusters []string) error {
return c.UpdatePeerClustersWithContext(context.Background(), cluster, peerClusters)
}
func (c *clusters) UpdatePeerClustersWithContext(ctx context.Context, cluster string, peerClusters []string) error {
endpoint := c.pulsar.endpoint(c.basePath, cluster, "peers")
return c.pulsar.Client.PostWithContext(ctx, endpoint, peerClusters)
}
func (c *clusters) CreateFailureDomain(data utils.FailureDomainData) error {
return c.CreateFailureDomainWithContext(context.Background(), data)
}
func (c *clusters) CreateFailureDomainWithContext(ctx context.Context, data utils.FailureDomainData) error {
endpoint := c.pulsar.endpoint(c.basePath, data.ClusterName, "failureDomains", data.DomainName)
return c.pulsar.Client.PostWithContext(ctx, endpoint, &data)
}
func (c *clusters) GetFailureDomain(clusterName string, domainName string) (utils.FailureDomainData, error) {
return c.GetFailureDomainWithContext(context.Background(), clusterName, domainName)
}
func (c *clusters) GetFailureDomainWithContext(
ctx context.Context,
clusterName string,
domainName string,
) (utils.FailureDomainData, error) {
var res utils.FailureDomainData
endpoint := c.pulsar.endpoint(c.basePath, clusterName, "failureDomains", domainName)
err := c.pulsar.Client.GetWithContext(ctx, endpoint, &res)
return res, err
}
func (c *clusters) ListFailureDomains(
clusterName string,
) (utils.FailureDomainMap, error) {
return c.ListFailureDomainsWithContext(context.Background(), clusterName)
}
func (c *clusters) ListFailureDomainsWithContext(
ctx context.Context,
clusterName string,
) (utils.FailureDomainMap, error) {
var domainData utils.FailureDomainMap
endpoint := c.pulsar.endpoint(c.basePath, clusterName, "failureDomains")
err := c.pulsar.Client.GetWithContext(ctx, endpoint, &domainData)
return domainData, err
}
func (c *clusters) DeleteFailureDomain(data utils.FailureDomainData) error {
return c.DeleteFailureDomainWithContext(context.Background(), data)
}
func (c *clusters) DeleteFailureDomainWithContext(ctx context.Context, data utils.FailureDomainData) error {
endpoint := c.pulsar.endpoint(c.basePath, data.ClusterName, "failureDomains", data.DomainName)
return c.pulsar.Client.DeleteWithContext(ctx, endpoint)
}
func (c *clusters) UpdateFailureDomain(data utils.FailureDomainData) error {
return c.UpdateFailureDomainWithContext(context.Background(), data)
}
func (c *clusters) UpdateFailureDomainWithContext(ctx context.Context, data utils.FailureDomainData) error {
endpoint := c.pulsar.endpoint(c.basePath, data.ClusterName, "failureDomains", data.DomainName)
return c.pulsar.Client.PostWithContext(ctx, endpoint, &data)
}