blob: de195138984a06f5057bbbf0f955bf40927c4b77 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package admin
import (
"context"
"fmt"
"strconv"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/rest"
"github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils"
)
// Topics is admin interface for topics management
type Topics interface {
// Create creates a partitioned or non-partitioned topic
//
// @param topic
// topicName struct
// @param partitions
// number of topic partitions,
// when setting to 0, it will create a non-partitioned topic
Create(topic utils.TopicName, partitions int) error
// CreateWithContext creates a partitioned or non-partitioned topic
//
// @param ctx
// context used for the request
// @param topic
// topicName struct
// @param partitions
// number of topic partitions,
// when setting to 0, it will create a non-partitioned topic
CreateWithContext(ctx context.Context, topic utils.TopicName, partitions int) error
// CreateWithProperties creates a partitioned or non-partitioned topic with specific properties
//
// @param topic
// topicName struct
// @param partitions
// number of topic partitions,
// when setting to 0, it will create a non-partitioned topic
// @param meta
// topic properties
CreateWithProperties(topic utils.TopicName, partitions int, meta map[string]string) error
// CreateWithPropertiesWithContext creates a partitioned or non-partitioned topic with specific properties
//
// @param ctx
// context used for the request
// @param topic
// topicName struct
// @param partitions
// number of topic partitions,
// when setting to 0, it will create a non-partitioned topic
// @param meta
// topic properties
CreateWithPropertiesWithContext(
ctx context.Context,
topic utils.TopicName,
partitions int,
meta map[string]string,
) error
// GetProperties returns the properties of a topic
GetProperties(topic utils.TopicName) (map[string]string, error)
// GetPropertiesWithContext returns the properties of a topic
GetPropertiesWithContext(ctx context.Context, topic utils.TopicName) (map[string]string, error)
// UpdateProperties updates the properties of a topic
UpdateProperties(topic utils.TopicName, properties map[string]string) error
// UpdatePropertiesWithContext updates the properties of a topic
UpdatePropertiesWithContext(ctx context.Context, topic utils.TopicName, properties map[string]string) error
// RemoveProperty removes a property with the given key of a topic
RemoveProperty(topic utils.TopicName, key string) error
// RemovePropertyWithContext removes a property with the given key of a topic
RemovePropertyWithContext(ctx context.Context, topic utils.TopicName, key string) error
// Delete deletes a topic, this function can delete both partitioned or non-partitioned topic
//
// @param topic
// topicName struct
// @param force
// delete topic forcefully
// @param nonPartitioned
// when set to true, topic will be treated as a non-partitioned topic
// Otherwise it will be treated as a partitioned topic
Delete(topic utils.TopicName, force bool, nonPartitioned bool) error
// DeleteWithContext deletes a topic, this function can delete both partitioned or non-partitioned topic
//
// @param ctx
// context used for the request
// @param topic
// topicName struct
// @param force
// delete topic forcefully
// @param nonPartitioned
// when set to true, topic will be treated as a non-partitioned topic
// Otherwise it will be treated as a partitioned topic
DeleteWithContext(ctx context.Context, topic utils.TopicName, force bool, nonPartitioned bool) error
// Update updates number of partitions of a non-global partitioned topic
// It requires partitioned-topic to be already exist and number of new partitions must be greater than existing
// number of partitions. Decrementing number of partitions requires deletion of topic which is not supported.
//
// @param topic
// topicName struct
// @param partitions
// number of new partitions of already exist partitioned-topic
Update(topic utils.TopicName, partitions int) error
// UpdateWithContext updates number of partitions of a non-global partitioned topic
// It requires partitioned-topic to be already exist and number of new partitions must be greater than existing
// number of partitions. Decrementing number of partitions requires deletion of topic which is not supported.
//
// @param ctx
// context used for the request
// @param topic
// topicName struct
// @param partitions
// number of new partitions of already exist partitioned-topic
UpdateWithContext(ctx context.Context, topic utils.TopicName, partitions int) error
// GetMetadata returns metadata of a partitioned topic
GetMetadata(utils.TopicName) (utils.PartitionedTopicMetadata, error)
// GetMetadataWithContext returns metadata of a partitioned topic
GetMetadataWithContext(context.Context, utils.TopicName) (utils.PartitionedTopicMetadata, error)
// List returns the list of topics under a namespace
List(utils.NameSpaceName) ([]string, []string, error)
// ListWithContext returns the list of topics under a namespace
ListWithContext(context.Context, utils.NameSpaceName) ([]string, []string, error)
// GetInternalInfo returns the internal metadata info for the topic
GetInternalInfo(utils.TopicName) (utils.ManagedLedgerInfo, error)
// GetInternalInfoWithContext returns the internal metadata info for the topic
GetInternalInfoWithContext(context.Context, utils.TopicName) (utils.ManagedLedgerInfo, error)
// GetPermissions returns permissions on a topic
// Retrieve the effective permissions for a topic. These permissions are defined by the permissions set at the
// namespace level combined (union) with any eventual specific permission set on the topic.
GetPermissions(utils.TopicName) (map[string][]utils.AuthAction, error)
// GetPermissionsWithContext returns permissions on a topic
// Retrieve the effective permissions for a topic. These permissions are defined by the permissions set at the
// namespace level combined (union) with any eventual specific permission set on the topic.
GetPermissionsWithContext(context.Context, utils.TopicName) (map[string][]utils.AuthAction, error)
// GrantPermission grants a new permission to a client role on a single topic
//
// @param topic
// topicName struct
// @param role
// client role to which grant permission
// @param action
// auth actions (e.g. produce and consume)
GrantPermission(topic utils.TopicName, role string, action []utils.AuthAction) error
// GrantPermissionWithContext grants a new permission to a client role on a single topic
//
// @param ctx
// context used for the request
// @param topic
// topicName struct
// @param role
// client role to which grant permission
// @param action
// auth actions (e.g. produce and consume)
GrantPermissionWithContext(ctx context.Context, topic utils.TopicName, role string, action []utils.AuthAction) error
// RevokePermission revokes permissions to a client role on a single topic. If the permission
// was not set at the topic level, but rather at the namespace level, this operation will
// return an error (HTTP status code 412).
//
// @param topic
// topicName struct
// @param role
// client role to which remove permissions
RevokePermission(topic utils.TopicName, role string) error
// RevokePermissionWithContext revokes permissions to a client role on a single topic. If the permission
// was not set at the topic level, but rather at the namespace level, this operation will
// return an error (HTTP status code 412).
//
// @param ctx
// context used for the request
// @param topic
// topicName struct
// @param role
// client role to which remove permissions
RevokePermissionWithContext(ctx context.Context, topic utils.TopicName, role string) error
// Lookup returns the broker URL that serves the topic
Lookup(utils.TopicName) (utils.LookupData, error)
// LookupWithContext returns the broker URL that serves the topic
LookupWithContext(context.Context, utils.TopicName) (utils.LookupData, error)
// GetBundleRange returns a bundle range of a topic
GetBundleRange(utils.TopicName) (string, error)
// GetBundleRangeWithContext returns a bundle range of a topic
GetBundleRangeWithContext(context.Context, utils.TopicName) (string, error)
// GetLastMessageID returns the last commit message Id of a topic
GetLastMessageID(utils.TopicName) (utils.MessageID, error)
// GetLastMessageIDWithContext returns the last commit message Id of a topic
GetLastMessageIDWithContext(context.Context, utils.TopicName) (utils.MessageID, error)
// GetMessageID returns the message Id by timestamp(ms) of a topic
//
// @param topic
// topicName struct
// @param timestamp
// absolute timestamp (in ms)
GetMessageID(topic utils.TopicName, timestamp int64) (utils.MessageID, error)
// GetMessageIDWithContext returns the message Id by timestamp(ms) of a topic
//
// @param ctx
// context used for the request
// @param topic
// topicName struct
// @param timestamp
// absolute timestamp (in ms)
GetMessageIDWithContext(ctx context.Context, topic utils.TopicName, timestamp int64) (utils.MessageID, error)
// GetStats returns the stats for the topic.
//
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
GetStats(utils.TopicName) (utils.TopicStats, error)
// GetStatsWithContext returns the stats for the topic.
//
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
GetStatsWithContext(context.Context, utils.TopicName) (utils.TopicStats, error)
// GetStatsWithOption returns the stats for the topic
//
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
//
// @param topic
// topicName struct
// @param option
// request option, e.g. get_precise_backlog or subscription_backlog_size
GetStatsWithOption(topic utils.TopicName, option utils.GetStatsOptions) (utils.TopicStats, error)
// GetStatsWithOptionWithContext returns the stats for the topic
//
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
//
// @param ctx
// context used for the request
// @param topic
// topicName struct
// @param option
// request option, e.g. get_precise_backlog or subscription_backlog_size
GetStatsWithOptionWithContext(
ctx context.Context,
topic utils.TopicName,
option utils.GetStatsOptions,
) (utils.TopicStats, error)
// GetInternalStats returns the internal stats for the topic.
GetInternalStats(utils.TopicName) (utils.PersistentTopicInternalStats, error)
// GetInternalStatsWithContext returns the internal stats for the topic.
GetInternalStatsWithContext(context.Context, utils.TopicName) (utils.PersistentTopicInternalStats, error)
// GetPartitionedStats returns the stats for the partitioned topic
//
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
//
// @param topic
// topicName struct
// @param perPartition
// flag to get stats per partition
GetPartitionedStats(topic utils.TopicName, perPartition bool) (utils.PartitionedTopicStats, error)
// GetPartitionedStatsWithContext returns the stats for the partitioned topic
//
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
//
// @param ctx
// context used for the request
// @param topic
// topicName struct
// @param perPartition
// flag to get stats per partition
GetPartitionedStatsWithContext(
ctx context.Context,
topic utils.TopicName,
perPartition bool,
) (utils.PartitionedTopicStats, error)
// GetPartitionedStatsWithOption returns the stats for the partitioned topic
//
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
//
// @param topic
// topicName struct
// @param perPartition
// flag to get stats per partition
// @param option
// request option, e.g. get_precise_backlog or subscription_backlog_size
GetPartitionedStatsWithOption(
topic utils.TopicName,
perPartition bool,
option utils.GetStatsOptions,
) (utils.PartitionedTopicStats, error)
// GetPartitionedStatsWithOptionWithContext returns the stats for the partitioned topic
//
// All the rates are computed over a 1-minute window and are relative the last completed 1-minute period
//
// @param ctx
// context used for the request
// @param topic
// topicName struct
// @param perPartition
// flag to get stats per partition
// @param option
// request option, e.g. get_precise_backlog or subscription_backlog_size
GetPartitionedStatsWithOptionWithContext(ctx context.Context,
topic utils.TopicName,
perPartition bool,
option utils.GetStatsOptions,
) (utils.PartitionedTopicStats, error)
// Terminate terminates the topic and prevent any more messages being published on it
Terminate(utils.TopicName) (utils.MessageID, error)
// TerminateWithContext terminates the topic and prevent any more messages being published on it
TerminateWithContext(context.Context, utils.TopicName) (utils.MessageID, error)
// Offload triggers offloading messages in topic to longterm storage
Offload(utils.TopicName, utils.MessageID) error
// OffloadWithContext triggers offloading messages in topic to longterm storage
OffloadWithContext(context.Context, utils.TopicName, utils.MessageID) error
// OffloadStatus checks the status of an ongoing offloading operation for a topic
OffloadStatus(utils.TopicName) (utils.OffloadProcessStatus, error)
// OffloadStatusWithContext checks the status of an ongoing offloading operation for a topic
OffloadStatusWithContext(context.Context, utils.TopicName) (utils.OffloadProcessStatus, error)
// Unload a topic
Unload(utils.TopicName) error
// UnloadWithContext a topic
UnloadWithContext(context.Context, utils.TopicName) error
// Compact triggers compaction to run for a topic. A single topic can only have one instance of compaction
// running at any time. Any attempt to trigger another will be met with a ConflictException.
Compact(utils.TopicName) error
// CompactWithContext triggers compaction to run for a topic. A single topic can only have one instance of compaction
// running at any time. Any attempt to trigger another will be met with a ConflictException.
CompactWithContext(context.Context, utils.TopicName) error
// CompactStatus checks the status of an ongoing compaction for a topic
CompactStatus(utils.TopicName) (utils.LongRunningProcessStatus, error)
// CompactStatusWithContext checks the status of an ongoing compaction for a topic
CompactStatusWithContext(context.Context, utils.TopicName) (utils.LongRunningProcessStatus, error)
// GetMessageTTL returns the message TTL for a topic. Returns -1 if not set
GetMessageTTL(utils.TopicName) (int, error)
// GetMessageTTLWithContext returns the message TTL for a topic. Returns -1 if not set
GetMessageTTLWithContext(context.Context, utils.TopicName) (int, error)
// SetMessageTTL sets the message TTL for a topic
//
// @param topic
// topicName struct
// @param messageTTL
// Message TTL in second
SetMessageTTL(topic utils.TopicName, messageTTL int) error
// SetMessageTTLWithContext sets the message TTL for a topic
//
// @param ctx
// context used for the request
// @param topic
// topicName struct
// @param messageTTL
// Message TTL in second
SetMessageTTLWithContext(ctx context.Context, topic utils.TopicName, messageTTL int) error
// RemoveMessageTTL removes the message TTL for a topic
RemoveMessageTTL(utils.TopicName) error
// RemoveMessageTTLWithContext removes the message TTL for a topic
RemoveMessageTTLWithContext(context.Context, utils.TopicName) error
// GetMaxProducers Get max number of producers for a topic. Returns -1 if not set
GetMaxProducers(utils.TopicName) (int, error)
// GetMaxProducersWithContext Get max number of producers for a topic. Returns -1 if not set
GetMaxProducersWithContext(context.Context, utils.TopicName) (int, error)
// SetMaxProducers sets max number of producers for a topic
//
// @param topic
// topicName struct
// @param maxProducers
// max number of producer
SetMaxProducers(topic utils.TopicName, maxProducers int) error
// SetMaxProducersWithContext sets max number of producers for a topic
//
// @param ctx
// context used for the request
// @param topic
// topicName struct
// @param maxProducers
// max number of producer
SetMaxProducersWithContext(ctx context.Context, topic utils.TopicName, maxProducers int) error
// RemoveMaxProducers removes max number of producers for a topic
RemoveMaxProducers(utils.TopicName) error
// RemoveMaxProducersWithContext removes max number of producers for a topic
RemoveMaxProducersWithContext(context.Context, utils.TopicName) error
// GetMaxConsumers returns max number of consumers for a topic. Returns -1 if not set
GetMaxConsumers(utils.TopicName) (int, error)
// GetMaxConsumersWithContext returns max number of consumers for a topic. Returns -1 if not set
GetMaxConsumersWithContext(context.Context, utils.TopicName) (int, error)
// SetMaxConsumers sets max number of consumers for a topic
//
// @param topic
// topicName struct
// @param maxConsumers
// max number of consumer
SetMaxConsumers(topic utils.TopicName, maxConsumers int) error
// SetMaxConsumersWithContext sets max number of consumers for a topic
//
// @param ctx
// context used for the request
// @param topic
// topicName struct
// @param maxConsumers
// max number of consumer
SetMaxConsumersWithContext(ctx context.Context, topic utils.TopicName, maxConsumers int) error
// RemoveMaxConsumers removes max number of consumers for a topic
RemoveMaxConsumers(utils.TopicName) error
// RemoveMaxConsumersWithContext removes max number of consumers for a topic
RemoveMaxConsumersWithContext(context.Context, utils.TopicName) error
// GetMaxUnackMessagesPerConsumer returns max unacked messages policy on consumer for a topic. Returns -1 if not set
GetMaxUnackMessagesPerConsumer(utils.TopicName) (int, error)
// GetMaxUnackMessagesPerConsumerWithContext returns max unacked messages policy on consumer for a topic.
// Returns -1 if not set
GetMaxUnackMessagesPerConsumerWithContext(context.Context, utils.TopicName) (int, error)
// SetMaxUnackMessagesPerConsumer sets max unacked messages policy on consumer for a topic
//
// @param topic
// topicName struct
// @param maxUnackedNum
// max unAcked messages on each consumer
SetMaxUnackMessagesPerConsumer(topic utils.TopicName, maxUnackedNum int) error
// SetMaxUnackMessagesPerConsumerWithContext sets max unacked messages policy on consumer for a topic
//
// @param ctx
// context used for the request
// @param topic
// topicName struct
// @param maxUnackedNum
// max unAcked messages on each consumer
SetMaxUnackMessagesPerConsumerWithContext(ctx context.Context, topic utils.TopicName, maxUnackedNum int) error
// RemoveMaxUnackMessagesPerConsumer removes max unacked messages policy on consumer for a topic
RemoveMaxUnackMessagesPerConsumer(utils.TopicName) error
// RemoveMaxUnackMessagesPerConsumerWithContext removes max unacked messages policy on consumer for a topic
RemoveMaxUnackMessagesPerConsumerWithContext(context.Context, utils.TopicName) error
// GetMaxUnackMessagesPerSubscription returns max unacked messages policy on subscription for a topic.
// Returns -1 if not set
GetMaxUnackMessagesPerSubscription(utils.TopicName) (int, error)
// GetMaxUnackMessagesPerSubscriptionWithContext returns max unacked messages policy on subscription for a topic.
// Returns -1 if not set
GetMaxUnackMessagesPerSubscriptionWithContext(context.Context, utils.TopicName) (int, error)
// SetMaxUnackMessagesPerSubscription sets max unacked messages policy on subscription for a topic
//
// @param topic
// topicName struct
// @param maxUnackedNum
// max unAcked messages on subscription of a topic
SetMaxUnackMessagesPerSubscription(topic utils.TopicName, maxUnackedNum int) error
// SetMaxUnackMessagesPerSubscriptionWithContext sets max unacked messages policy on subscription for a topic
//
// @param ctx
// context used for the request
// @param topic
// topicName struct
// @param maxUnackedNum
// max unAcked messages on subscription of a topic
SetMaxUnackMessagesPerSubscriptionWithContext(ctx context.Context, topic utils.TopicName, maxUnackedNum int) error
// RemoveMaxUnackMessagesPerSubscription removes max unacked messages policy on subscription for a topic
RemoveMaxUnackMessagesPerSubscription(utils.TopicName) error
// RemoveMaxUnackMessagesPerSubscriptionWithContext removes max unacked messages policy on subscription for a topic
RemoveMaxUnackMessagesPerSubscriptionWithContext(context.Context, utils.TopicName) error
// GetPersistence returns the persistence policies for a topic.
// Returns nil if the persistence policy is not configured at the topic level.
GetPersistence(utils.TopicName) (*utils.PersistenceData, error)
// GetPersistenceWithContext returns the persistence policies for a topic.
// Returns nil if the persistence policy is not configured at the topic level.
GetPersistenceWithContext(context.Context, utils.TopicName) (*utils.PersistenceData, error)
// SetPersistence sets the persistence policies for a topic
SetPersistence(utils.TopicName, utils.PersistenceData) error
// SetPersistenceWithContext sets the persistence policies for a topic
SetPersistenceWithContext(context.Context, utils.TopicName, utils.PersistenceData) error
// RemovePersistence removes the persistence policies for a topic
RemovePersistence(utils.TopicName) error
// RemovePersistenceWithContext removes the persistence policies for a topic
RemovePersistenceWithContext(context.Context, utils.TopicName) error
// GetDelayedDelivery returns the delayed delivery policy for a topic.
// Returns nil if the delayed delivery policy is not configured at the topic level.
GetDelayedDelivery(utils.TopicName) (*utils.DelayedDeliveryData, error)
// GetDelayedDeliveryWithContext returns the delayed delivery policy for a topic.
// Returns nil if the delayed delivery policy is not configured at the topic level.
GetDelayedDeliveryWithContext(context.Context, utils.TopicName) (*utils.DelayedDeliveryData, error)
// SetDelayedDelivery sets the delayed delivery policy on a topic
SetDelayedDelivery(utils.TopicName, utils.DelayedDeliveryData) error
// SetDelayedDeliveryWithContext sets the delayed delivery policy on a topic
SetDelayedDeliveryWithContext(context.Context, utils.TopicName, utils.DelayedDeliveryData) error
// RemoveDelayedDelivery removes the delayed delivery policy on a topic
RemoveDelayedDelivery(utils.TopicName) error
// RemoveDelayedDeliveryWithContext removes the delayed delivery policy on a topic
RemoveDelayedDeliveryWithContext(context.Context, utils.TopicName) error
// GetDispatchRate returns message dispatch rate for a topic.
// Returns nil if the dispatch rate is not configured at the topic level.
GetDispatchRate(utils.TopicName) (*utils.DispatchRateData, error)
// GetDispatchRateWithContext returns message dispatch rate for a topic.
// Returns nil if the dispatch rate is not configured at the topic level.
GetDispatchRateWithContext(context.Context, utils.TopicName) (*utils.DispatchRateData, error)
// SetDispatchRate sets message dispatch rate for a topic
SetDispatchRate(utils.TopicName, utils.DispatchRateData) error
// SetDispatchRateWithContext sets message dispatch rate for a topic
SetDispatchRateWithContext(context.Context, utils.TopicName, utils.DispatchRateData) error
// RemoveDispatchRate removes message dispatch rate for a topic
RemoveDispatchRate(utils.TopicName) error
// RemoveDispatchRateWithContext removes message dispatch rate for a topic
RemoveDispatchRateWithContext(context.Context, utils.TopicName) error
// GetPublishRate returns message publish rate for a topic.
// Returns nil if the publish rate is not configured at the topic level.
GetPublishRate(utils.TopicName) (*utils.PublishRateData, error)
// GetPublishRateWithContext returns message publish rate for a topic.
// Returns nil if the publish rate is not configured at the topic level.
GetPublishRateWithContext(context.Context, utils.TopicName) (*utils.PublishRateData, error)
// SetPublishRate sets message publish rate for a topic
SetPublishRate(utils.TopicName, utils.PublishRateData) error
// SetPublishRateWithContext sets message publish rate for a topic
SetPublishRateWithContext(context.Context, utils.TopicName, utils.PublishRateData) error
// RemovePublishRate removes message publish rate for a topic
RemovePublishRate(utils.TopicName) error
// RemovePublishRateWithContext removes message publish rate for a topic
RemovePublishRateWithContext(context.Context, utils.TopicName) error
// GetDeduplicationStatus returns the deduplication policy for a topic
GetDeduplicationStatus(utils.TopicName) (bool, error)
// GetDeduplicationStatusWithContext returns the deduplication policy for a topic
GetDeduplicationStatusWithContext(context.Context, utils.TopicName) (bool, error)
// SetDeduplicationStatus sets the deduplication policy for a topic
//
// @param topic
// topicName struct
// @param enabled
// set enable or disable deduplication of the topic
SetDeduplicationStatus(topic utils.TopicName, enabled bool) error
// SetDeduplicationStatusWithContext sets the deduplication policy for a topic
//
// @param ctx
// context used for the request
// @param topic
// topicName struct
// @param enabled
// set enable or disable deduplication of the topic
SetDeduplicationStatusWithContext(ctx context.Context, topic utils.TopicName, enabled bool) error
// RemoveDeduplicationStatus removes the deduplication policy for a topic
RemoveDeduplicationStatus(utils.TopicName) error
// RemoveDeduplicationStatusWithContext removes the deduplication policy for a topic
RemoveDeduplicationStatusWithContext(context.Context, utils.TopicName) error
// GetRetention returns the retention configuration for a topic.
// Returns nil if the retention policy is not configured at the topic level.
//
// @param topic
// topicName struct
// @param applied
// when set to true, function will try to find policy applied to this topic
// in namespace or broker level, if no policy set in topic level
GetRetention(topic utils.TopicName, applied bool) (*utils.RetentionPolicies, error)
// GetRetentionWithContext returns the retention configuration for a topic.
// Returns nil if the retention policy is not configured at the topic level.
//
// @param ctx
// context used for the request
// @param topic
// topicName struct
// @param applied
// when set to true, function will try to find policy applied to this topic
// in namespace or broker level, if no policy set in topic level
GetRetentionWithContext(ctx context.Context, topic utils.TopicName, applied bool) (*utils.RetentionPolicies, error)
// RemoveRetention removes the retention configuration on a topic
RemoveRetention(utils.TopicName) error
// RemoveRetentionWithContext removes the retention configuration on a topic
RemoveRetentionWithContext(context.Context, utils.TopicName) error
// SetRetention sets the retention policy for a topic
SetRetention(utils.TopicName, utils.RetentionPolicies) error
// SetRetentionWithContext sets the retention policy for a topic
SetRetentionWithContext(context.Context, utils.TopicName, utils.RetentionPolicies) error
// GetCompactionThreshold returns the compaction threshold for a topic. Returns -1 if not set
//
// i.e. The maximum number of bytes can have before compaction is triggered.
//
// @param topic
// topicName struct
// @param applied
// when set to true, function will try to find policy applied to this topic
// in namespace or broker level, if no policy set in topic level
GetCompactionThreshold(topic utils.TopicName, applied bool) (int64, error)
// GetCompactionThresholdWithContext returns the compaction threshold for a topic. Returns -1 if not set
//
// i.e. The maximum number of bytes can have before compaction is triggered.
//
// @param ctx
// context used for the request
// @param topic
// topicName struct
// @param applied
// when set to true, function will try to find policy applied to this topic
// in namespace or broker level, if no policy set in topic level
GetCompactionThresholdWithContext(ctx context.Context, topic utils.TopicName, applied bool) (int64, error)
// SetCompactionThreshold sets the compaction threshold for a topic
//
// @param topic
// topicName struct
// @param threshold
// maximum number of backlog bytes before compaction is triggered
SetCompactionThreshold(topic utils.TopicName, threshold int64) error
// SetCompactionThresholdWithContext sets the compaction threshold for a topic
//
// @param ctx
// context used for the request
// @param topic
// topicName struct
// @param threshold
// maximum number of backlog bytes before compaction is triggered
SetCompactionThresholdWithContext(ctx context.Context, topic utils.TopicName, threshold int64) error
// RemoveCompactionThreshold removes compaction threshold for a topic
RemoveCompactionThreshold(utils.TopicName) error
// RemoveCompactionThresholdWithContext removes compaction threshold for a topic
RemoveCompactionThresholdWithContext(context.Context, utils.TopicName) error
// GetBacklogQuotaMap returns backlog quota map for a topic
//
// @param topic
// topicName struct
// @param applied
// when set to true, function will try to find policy applied to this topic
// in namespace or broker level, if no policy set in topic level
GetBacklogQuotaMap(topic utils.TopicName, applied bool) (map[utils.BacklogQuotaType]utils.BacklogQuota, error)
// GetBacklogQuotaMapWithContext returns backlog quota map for a topic
//
// @param ctx
// context used for the request
// @param topic
// topicName struct
// @param applied
// when set to true, function will try to find policy applied to this topic
// in namespace or broker level, if no policy set in topic level
GetBacklogQuotaMapWithContext(
ctx context.Context,
topic utils.TopicName,
applied bool,
) (map[utils.BacklogQuotaType]utils.BacklogQuota, error)
// SetBacklogQuota sets a backlog quota for a topic
SetBacklogQuota(utils.TopicName, utils.BacklogQuota, utils.BacklogQuotaType) error
// SetBacklogQuotaWithContext sets a backlog quota for a topic
SetBacklogQuotaWithContext(context.Context, utils.TopicName, utils.BacklogQuota, utils.BacklogQuotaType) error
// RemoveBacklogQuota removes a backlog quota policy from a topic
RemoveBacklogQuota(utils.TopicName, utils.BacklogQuotaType) error
// RemoveBacklogQuotaWithContext removes a backlog quota policy from a topic
RemoveBacklogQuotaWithContext(context.Context, utils.TopicName, utils.BacklogQuotaType) error
// GetInactiveTopicPolicies returns the inactive topic policies on a topic
//
// @param topic
// topicName struct
// @param applied
// when set to true, function will try to find policy applied to this topic
// in namespace or broker level, if no policy set in topic level
GetInactiveTopicPolicies(topic utils.TopicName, applied bool) (utils.InactiveTopicPolicies, error)
// GetInactiveTopicPoliciesWithContext returns the inactive topic policies on a topic
//
// @param ctx
// context used for the request
// @param topic
// topicName struct
// @param applied
// when set to true, function will try to find policy applied to this topic
// in namespace or broker level, if no policy set in topic level
GetInactiveTopicPoliciesWithContext(
ctx context.Context,
topic utils.TopicName,
applied bool,
) (utils.InactiveTopicPolicies, error)
// RemoveInactiveTopicPolicies removes inactive topic policies from a topic
RemoveInactiveTopicPolicies(utils.TopicName) error
// RemoveInactiveTopicPoliciesWithContext removes inactive topic policies from a topic
RemoveInactiveTopicPoliciesWithContext(context.Context, utils.TopicName) error
// SetInactiveTopicPolicies sets the inactive topic policies on a topic
SetInactiveTopicPolicies(topic utils.TopicName, data utils.InactiveTopicPolicies) error
// SetInactiveTopicPoliciesWithContext sets the inactive topic policies on a topic
SetInactiveTopicPoliciesWithContext(ctx context.Context, topic utils.TopicName, data utils.InactiveTopicPolicies) error
// GetReplicationClusters returns the replication clusters of a topic
GetReplicationClusters(topic utils.TopicName) ([]string, error)
// GetReplicationClustersWithContext returns the replication clusters of a topic
GetReplicationClustersWithContext(ctx context.Context, topic utils.TopicName) ([]string, error)
// SetReplicationClusters sets the replication clusters on a topic
//
// @param topic
// topicName struct
// @param data
// list of replication cluster id
SetReplicationClusters(topic utils.TopicName, data []string) error
// SetReplicationClustersWithContext sets the replication clusters on a topic
//
// @param ctx
// context used for the request
// @param topic
// topicName struct
// @param data
// list of replication cluster id
SetReplicationClustersWithContext(ctx context.Context, topic utils.TopicName, data []string) error
// GetSubscribeRate returns subscribe rate configuration for a topic.
// Returns nil if the subscribe rate is not configured at the topic level.
GetSubscribeRate(utils.TopicName) (*utils.SubscribeRate, error)
// GetSubscribeRateWithContext returns subscribe rate configuration for a topic.
// Returns nil if the subscribe rate is not configured at the topic level.
GetSubscribeRateWithContext(context.Context, utils.TopicName) (*utils.SubscribeRate, error)
// SetSubscribeRate sets subscribe rate configuration for a topic
SetSubscribeRate(utils.TopicName, utils.SubscribeRate) error
// SetSubscribeRateWithContext sets subscribe rate configuration for a topic
SetSubscribeRateWithContext(context.Context, utils.TopicName, utils.SubscribeRate) error
// RemoveSubscribeRate removes subscribe rate configuration for a topic
RemoveSubscribeRate(utils.TopicName) error
// RemoveSubscribeRateWithContext removes subscribe rate configuration for a topic
RemoveSubscribeRateWithContext(context.Context, utils.TopicName) error
// GetSubscriptionDispatchRate returns subscription dispatch rate for a topic.
// Returns nil if the subscription dispatch rate is not configured at the topic level.
GetSubscriptionDispatchRate(utils.TopicName) (*utils.DispatchRateData, error)
// GetSubscriptionDispatchRateWithContext returns subscription dispatch rate for a topic.
// Returns nil if the subscription dispatch rate is not configured at the topic level.
GetSubscriptionDispatchRateWithContext(context.Context, utils.TopicName) (*utils.DispatchRateData, error)
// SetSubscriptionDispatchRate sets subscription dispatch rate for a topic
SetSubscriptionDispatchRate(utils.TopicName, utils.DispatchRateData) error
// SetSubscriptionDispatchRateWithContext sets subscription dispatch rate for a topic
SetSubscriptionDispatchRateWithContext(context.Context, utils.TopicName, utils.DispatchRateData) error
// RemoveSubscriptionDispatchRate removes subscription dispatch rate for a topic
RemoveSubscriptionDispatchRate(utils.TopicName) error
// RemoveSubscriptionDispatchRateWithContext removes subscription dispatch rate for a topic
RemoveSubscriptionDispatchRateWithContext(context.Context, utils.TopicName) error
// GetMaxConsumersPerSubscription returns max consumers per subscription for a topic. Returns -1 if not set
GetMaxConsumersPerSubscription(utils.TopicName) (int, error)
// GetMaxConsumersPerSubscriptionWithContext returns max consumers per subscription for a topic. Returns -1 if not set
GetMaxConsumersPerSubscriptionWithContext(context.Context, utils.TopicName) (int, error)
// SetMaxConsumersPerSubscription sets max consumers per subscription for a topic
SetMaxConsumersPerSubscription(utils.TopicName, int) error
// SetMaxConsumersPerSubscriptionWithContext sets max consumers per subscription for a topic
SetMaxConsumersPerSubscriptionWithContext(context.Context, utils.TopicName, int) error
// RemoveMaxConsumersPerSubscription removes max consumers per subscription for a topic
RemoveMaxConsumersPerSubscription(utils.TopicName) error
// RemoveMaxConsumersPerSubscriptionWithContext removes max consumers per subscription for a topic
RemoveMaxConsumersPerSubscriptionWithContext(context.Context, utils.TopicName) error
// GetMaxMessageSize returns max message size for a topic. Returns -1 if not set
GetMaxMessageSize(utils.TopicName) (int, error)
// GetMaxMessageSizeWithContext returns max message size for a topic. Returns -1 if not set
GetMaxMessageSizeWithContext(context.Context, utils.TopicName) (int, error)
// SetMaxMessageSize sets max message size for a topic
SetMaxMessageSize(utils.TopicName, int) error
// SetMaxMessageSizeWithContext sets max message size for a topic
SetMaxMessageSizeWithContext(context.Context, utils.TopicName, int) error
// RemoveMaxMessageSize removes max message size for a topic
RemoveMaxMessageSize(utils.TopicName) error
// RemoveMaxMessageSizeWithContext removes max message size for a topic
RemoveMaxMessageSizeWithContext(context.Context, utils.TopicName) error
// GetMaxSubscriptionsPerTopic returns max subscriptions per topic. Returns -1 if not set
GetMaxSubscriptionsPerTopic(utils.TopicName) (int, error)
// GetMaxSubscriptionsPerTopicWithContext returns max subscriptions per topic. Returns -1 if not set
GetMaxSubscriptionsPerTopicWithContext(context.Context, utils.TopicName) (int, error)
// SetMaxSubscriptionsPerTopic sets max subscriptions per topic
SetMaxSubscriptionsPerTopic(utils.TopicName, int) error
// SetMaxSubscriptionsPerTopicWithContext sets max subscriptions per topic
SetMaxSubscriptionsPerTopicWithContext(context.Context, utils.TopicName, int) error
// RemoveMaxSubscriptionsPerTopic removes max subscriptions per topic
RemoveMaxSubscriptionsPerTopic(utils.TopicName) error
// RemoveMaxSubscriptionsPerTopicWithContext removes max subscriptions per topic
RemoveMaxSubscriptionsPerTopicWithContext(context.Context, utils.TopicName) error
// GetSchemaValidationEnforced returns schema validation enforced flag for a topic
GetSchemaValidationEnforced(utils.TopicName) (bool, error)
// GetSchemaValidationEnforcedWithContext returns schema validation enforced flag for a topic
GetSchemaValidationEnforcedWithContext(context.Context, utils.TopicName) (bool, error)
// SetSchemaValidationEnforced sets schema validation enforced flag for a topic
SetSchemaValidationEnforced(utils.TopicName, bool) error
// SetSchemaValidationEnforcedWithContext sets schema validation enforced flag for a topic
SetSchemaValidationEnforcedWithContext(context.Context, utils.TopicName, bool) error
// RemoveSchemaValidationEnforced removes schema validation enforced flag for a topic
RemoveSchemaValidationEnforced(utils.TopicName) error
// RemoveSchemaValidationEnforcedWithContext removes schema validation enforced flag for a topic
RemoveSchemaValidationEnforcedWithContext(context.Context, utils.TopicName) error
// GetDeduplicationSnapshotInterval returns deduplication snapshot interval for a topic. Returns -1 if not set
GetDeduplicationSnapshotInterval(utils.TopicName) (int, error)
// GetDeduplicationSnapshotIntervalWithContext returns deduplication snapshot interval for a topic.
// Returns -1 if not set
GetDeduplicationSnapshotIntervalWithContext(context.Context, utils.TopicName) (int, error)
// SetDeduplicationSnapshotInterval sets deduplication snapshot interval for a topic
SetDeduplicationSnapshotInterval(utils.TopicName, int) error
// SetDeduplicationSnapshotIntervalWithContext sets deduplication snapshot interval for a topic
SetDeduplicationSnapshotIntervalWithContext(context.Context, utils.TopicName, int) error
// RemoveDeduplicationSnapshotInterval removes deduplication snapshot interval for a topic
RemoveDeduplicationSnapshotInterval(utils.TopicName) error
// RemoveDeduplicationSnapshotIntervalWithContext removes deduplication snapshot interval for a topic
RemoveDeduplicationSnapshotIntervalWithContext(context.Context, utils.TopicName) error
// GetReplicatorDispatchRate returns replicator dispatch rate for a topic.
// Returns nil if the replicator dispatch rate is not configured at the topic level.
GetReplicatorDispatchRate(utils.TopicName) (*utils.DispatchRateData, error)
// GetReplicatorDispatchRateWithContext returns replicator dispatch rate for a topic.
// Returns nil if the replicator dispatch rate is not configured at the topic level.
GetReplicatorDispatchRateWithContext(context.Context, utils.TopicName) (*utils.DispatchRateData, error)
// SetReplicatorDispatchRate sets replicator dispatch rate for a topic
SetReplicatorDispatchRate(utils.TopicName, utils.DispatchRateData) error
// SetReplicatorDispatchRateWithContext sets replicator dispatch rate for a topic
SetReplicatorDispatchRateWithContext(context.Context, utils.TopicName, utils.DispatchRateData) error
// RemoveReplicatorDispatchRate removes replicator dispatch rate for a topic
RemoveReplicatorDispatchRate(utils.TopicName) error
// RemoveReplicatorDispatchRateWithContext removes replicator dispatch rate for a topic
RemoveReplicatorDispatchRateWithContext(context.Context, utils.TopicName) error
// GetOffloadPolicies returns offload policies for a topic.
// Returns nil if the offload policies are not configured at the topic level.
GetOffloadPolicies(utils.TopicName) (*utils.OffloadPolicies, error)
// GetOffloadPoliciesWithContext returns offload policies for a topic.
// Returns nil if the offload policies are not configured at the topic level.
GetOffloadPoliciesWithContext(context.Context, utils.TopicName) (*utils.OffloadPolicies, error)
// SetOffloadPolicies sets offload policies for a topic
SetOffloadPolicies(utils.TopicName, utils.OffloadPolicies) error
// SetOffloadPoliciesWithContext sets offload policies for a topic
SetOffloadPoliciesWithContext(context.Context, utils.TopicName, utils.OffloadPolicies) error
// RemoveOffloadPolicies removes offload policies for a topic
RemoveOffloadPolicies(utils.TopicName) error
// RemoveOffloadPoliciesWithContext removes offload policies for a topic
RemoveOffloadPoliciesWithContext(context.Context, utils.TopicName) error
// GetAutoSubscriptionCreation returns auto subscription creation override for a topic.
// Returns nil if the auto subscription creation override is not configured at the topic level.
GetAutoSubscriptionCreation(utils.TopicName) (*utils.AutoSubscriptionCreationOverride, error)
// GetAutoSubscriptionCreationWithContext returns auto subscription creation override for a topic.
// Returns nil if the auto subscription creation override is not configured at the topic level.
GetAutoSubscriptionCreationWithContext(
context.Context,
utils.TopicName,
) (*utils.AutoSubscriptionCreationOverride, error)
// SetAutoSubscriptionCreation sets auto subscription creation override for a topic
SetAutoSubscriptionCreation(utils.TopicName,
utils.AutoSubscriptionCreationOverride) error
// SetAutoSubscriptionCreationWithContext sets auto subscription creation override for a topic
SetAutoSubscriptionCreationWithContext(context.Context, utils.TopicName,
utils.AutoSubscriptionCreationOverride) error
// RemoveAutoSubscriptionCreation Remove auto subscription creation override for a topic
RemoveAutoSubscriptionCreation(utils.TopicName) error
// RemoveAutoSubscriptionCreationWithContext Remove auto subscription creation override for a topic
RemoveAutoSubscriptionCreationWithContext(context.Context, utils.TopicName) error
// GetSchemaCompatibilityStrategy returns schema compatibility strategy for a topic
GetSchemaCompatibilityStrategy(utils.TopicName) (utils.SchemaCompatibilityStrategy, error)
// GetSchemaCompatibilityStrategyWithContext returns schema compatibility strategy for a topic
GetSchemaCompatibilityStrategyWithContext(context.Context, utils.TopicName) (utils.SchemaCompatibilityStrategy, error)
// SetSchemaCompatibilityStrategy sets schema compatibility strategy for a topic
SetSchemaCompatibilityStrategy(utils.TopicName,
utils.SchemaCompatibilityStrategy) error
// SetSchemaCompatibilityStrategyWithContext sets schema compatibility strategy for a topic
SetSchemaCompatibilityStrategyWithContext(context.Context, utils.TopicName,
utils.SchemaCompatibilityStrategy) error
// RemoveSchemaCompatibilityStrategy removes schema compatibility strategy for a topic
RemoveSchemaCompatibilityStrategy(utils.TopicName) error
// RemoveSchemaCompatibilityStrategyWithContext removes schema compatibility strategy for a topic
RemoveSchemaCompatibilityStrategyWithContext(context.Context, utils.TopicName) error
}
type topics struct {
pulsar *pulsarClient
basePath string
persistentPath string
nonPersistentPath string
lookupPath string
}
// Check whether the topics struct implements the Topics interface.
var _ Topics = &topics{}
// Topics is used to access the topics endpoints
func (c *pulsarClient) Topics() Topics {
return &topics{
pulsar: c,
basePath: "",
persistentPath: "/persistent",
nonPersistentPath: "/non-persistent",
lookupPath: "/lookup/v2/topic",
}
}
func (t *topics) Create(topic utils.TopicName, partitions int) error {
return t.CreateWithContext(context.Background(), topic, partitions)
}
func (t *topics) CreateWithContext(ctx context.Context, topic utils.TopicName, partitions int) error {
return t.CreateWithPropertiesWithContext(ctx, topic, partitions, nil)
}
func (t *topics) CreateWithProperties(topic utils.TopicName, partitions int, meta map[string]string) error {
return t.CreateWithPropertiesWithContext(context.Background(), topic, partitions, meta)
}
func (t *topics) CreateWithPropertiesWithContext(
ctx context.Context,
topic utils.TopicName,
partitions int,
meta map[string]string,
) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "partitions")
if partitions == 0 {
endpoint = t.pulsar.endpoint(t.basePath, topic.GetRestPath())
return t.pulsar.Client.PutWithContext(ctx, endpoint, meta)
}
data := struct {
Meta map[string]string `json:"properties"`
Partitions int `json:"partitions"`
}{
Meta: meta,
Partitions: partitions,
}
return t.pulsar.Client.PutWithCustomMediaTypeWithContext(ctx, endpoint, &data, nil, nil, rest.PartitionedTopicMetaJSON)
}
func (t *topics) GetProperties(topic utils.TopicName) (map[string]string, error) {
return t.GetPropertiesWithContext(context.Background(), topic)
}
func (t *topics) GetPropertiesWithContext(ctx context.Context, topic utils.TopicName) (map[string]string, error) {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "properties")
var properties map[string]string
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &properties)
return properties, err
}
func (t *topics) UpdateProperties(topic utils.TopicName, properties map[string]string) error {
return t.UpdatePropertiesWithContext(context.Background(), topic, properties)
}
func (t *topics) UpdatePropertiesWithContext(
ctx context.Context,
topic utils.TopicName,
properties map[string]string,
) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "properties")
return t.pulsar.Client.PutWithContext(ctx, endpoint, properties)
}
func (t *topics) RemoveProperty(topic utils.TopicName, key string) error {
return t.RemovePropertyWithContext(context.Background(), topic, key)
}
func (t *topics) RemovePropertyWithContext(ctx context.Context, topic utils.TopicName, key string) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "properties")
return t.pulsar.Client.DeleteWithQueryParamsWithContext(ctx, endpoint, map[string]string{"key": key})
}
func (t *topics) Delete(topic utils.TopicName, force bool, nonPartitioned bool) error {
return t.DeleteWithContext(context.Background(), topic, force, nonPartitioned)
}
func (t *topics) DeleteWithContext(ctx context.Context, topic utils.TopicName, force bool, nonPartitioned bool) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "partitions")
if nonPartitioned {
endpoint = t.pulsar.endpoint(t.basePath, topic.GetRestPath())
}
params := map[string]string{
"force": strconv.FormatBool(force),
}
return t.pulsar.Client.DeleteWithQueryParamsWithContext(ctx, endpoint, params)
}
func (t *topics) Update(topic utils.TopicName, partitions int) error {
return t.UpdateWithContext(context.Background(), topic, partitions)
}
func (t *topics) UpdateWithContext(ctx context.Context, topic utils.TopicName, partitions int) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "partitions")
return t.pulsar.Client.PostWithContext(ctx, endpoint, partitions)
}
func (t *topics) GetMetadata(topic utils.TopicName) (utils.PartitionedTopicMetadata, error) {
return t.GetMetadataWithContext(context.Background(), topic)
}
func (t *topics) GetMetadataWithContext(
ctx context.Context,
topic utils.TopicName,
) (utils.PartitionedTopicMetadata, error) {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "partitions")
var partitionedMeta utils.PartitionedTopicMetadata
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &partitionedMeta)
return partitionedMeta, err
}
func (t *topics) List(namespace utils.NameSpaceName) ([]string, []string, error) {
return t.ListWithContext(context.Background(), namespace)
}
func (t *topics) ListWithContext(ctx context.Context, namespace utils.NameSpaceName) ([]string, []string, error) {
var partitionedTopics, nonPartitionedTopics []string
partitionedTopicsChan := make(chan []string)
nonPartitionedTopicsChan := make(chan []string)
errChan := make(chan error)
pp := t.pulsar.endpoint(t.persistentPath, namespace.String(), "partitioned")
np := t.pulsar.endpoint(t.nonPersistentPath, namespace.String(), "partitioned")
p := t.pulsar.endpoint(t.persistentPath, namespace.String())
n := t.pulsar.endpoint(t.nonPersistentPath, namespace.String())
go t.getTopics(ctx, pp, partitionedTopicsChan, errChan)
go t.getTopics(ctx, np, partitionedTopicsChan, errChan)
go t.getTopics(ctx, p, nonPartitionedTopicsChan, errChan)
go t.getTopics(ctx, n, nonPartitionedTopicsChan, errChan)
requestCount := 4
for {
select {
case err := <-errChan:
if err != nil {
return nil, nil, err
}
continue
case pTopic := <-partitionedTopicsChan:
requestCount--
partitionedTopics = append(partitionedTopics, pTopic...)
case npTopic := <-nonPartitionedTopicsChan:
requestCount--
nonPartitionedTopics = append(nonPartitionedTopics, npTopic...)
}
if requestCount == 0 {
break
}
}
return partitionedTopics, nonPartitionedTopics, nil
}
func (t *topics) getTopics(ctx context.Context, endpoint string, out chan<- []string, err chan<- error) {
var topics []string
err <- t.pulsar.Client.GetWithContext(ctx, endpoint, &topics)
out <- topics
}
func (t *topics) GetInternalInfo(topic utils.TopicName) (utils.ManagedLedgerInfo, error) {
return t.GetInternalInfoWithContext(context.Background(), topic)
}
func (t *topics) GetInternalInfoWithContext(
ctx context.Context,
topic utils.TopicName,
) (utils.ManagedLedgerInfo, error) {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "internal-info")
var info utils.ManagedLedgerInfo
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &info)
return info, err
}
func (t *topics) GetPermissions(topic utils.TopicName) (map[string][]utils.AuthAction, error) {
return t.GetPermissionsWithContext(context.Background(), topic)
}
func (t *topics) GetPermissionsWithContext(
ctx context.Context,
topic utils.TopicName,
) (map[string][]utils.AuthAction, error) {
var permissions map[string][]utils.AuthAction
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "permissions")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &permissions)
return permissions, err
}
func (t *topics) GrantPermission(topic utils.TopicName, role string, action []utils.AuthAction) error {
return t.GrantPermissionWithContext(context.Background(), topic, role, action)
}
func (t *topics) GrantPermissionWithContext(
ctx context.Context,
topic utils.TopicName,
role string,
action []utils.AuthAction,
) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "permissions", role)
s := []string{}
for _, v := range action {
s = append(s, v.String())
}
return t.pulsar.Client.PostWithContext(ctx, endpoint, s)
}
func (t *topics) RevokePermission(topic utils.TopicName, role string) error {
return t.RevokePermissionWithContext(context.Background(), topic, role)
}
func (t *topics) RevokePermissionWithContext(ctx context.Context, topic utils.TopicName, role string) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "permissions", role)
return t.pulsar.Client.DeleteWithContext(ctx, endpoint)
}
func (t *topics) Lookup(topic utils.TopicName) (utils.LookupData, error) {
return t.LookupWithContext(context.Background(), topic)
}
func (t *topics) LookupWithContext(ctx context.Context, topic utils.TopicName) (utils.LookupData, error) {
var lookup utils.LookupData
endpoint := fmt.Sprintf("%s/%s", t.lookupPath, topic.GetRestPath())
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &lookup)
return lookup, err
}
func (t *topics) GetBundleRange(topic utils.TopicName) (string, error) {
return t.GetBundleRangeWithContext(context.Background(), topic)
}
func (t *topics) GetBundleRangeWithContext(ctx context.Context, topic utils.TopicName) (string, error) {
endpoint := fmt.Sprintf("%s/%s/%s", t.lookupPath, topic.GetRestPath(), "bundle")
data, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint, nil, nil, false)
return string(data), err
}
func (t *topics) GetLastMessageID(topic utils.TopicName) (utils.MessageID, error) {
return t.GetLastMessageIDWithContext(context.Background(), topic)
}
func (t *topics) GetLastMessageIDWithContext(ctx context.Context, topic utils.TopicName) (utils.MessageID, error) {
var messageID utils.MessageID
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "lastMessageId")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &messageID)
return messageID, err
}
func (t *topics) GetMessageID(topic utils.TopicName, timestamp int64) (utils.MessageID, error) {
return t.GetMessageIDWithContext(context.Background(), topic, timestamp)
}
func (t *topics) GetMessageIDWithContext(
ctx context.Context,
topic utils.TopicName,
timestamp int64,
) (utils.MessageID, error) {
var messageID utils.MessageID
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "messageid", strconv.FormatInt(timestamp, 10))
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &messageID)
return messageID, err
}
func (t *topics) GetStats(topic utils.TopicName) (utils.TopicStats, error) {
return t.GetStatsWithContext(context.Background(), topic)
}
func (t *topics) GetStatsWithContext(ctx context.Context, topic utils.TopicName) (utils.TopicStats, error) {
var stats utils.TopicStats
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "stats")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &stats)
return stats, err
}
func (t *topics) GetStatsWithOption(topic utils.TopicName, option utils.GetStatsOptions) (utils.TopicStats, error) {
return t.GetStatsWithOptionWithContext(context.Background(), topic, option)
}
func (t *topics) GetStatsWithOptionWithContext(
ctx context.Context,
topic utils.TopicName,
option utils.GetStatsOptions,
) (utils.TopicStats, error) {
var stats utils.TopicStats
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "stats")
params := map[string]string{
"getPreciseBacklog": strconv.FormatBool(option.GetPreciseBacklog),
"subscriptionBacklogSize": strconv.FormatBool(option.SubscriptionBacklogSize),
"getEarliestTimeInBacklog": strconv.FormatBool(option.GetEarliestTimeInBacklog),
"excludePublishers": strconv.FormatBool(option.ExcludePublishers),
"excludeConsumers": strconv.FormatBool(option.ExcludeConsumers),
}
_, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint, &stats, params, true)
return stats, err
}
func (t *topics) GetInternalStats(topic utils.TopicName) (utils.PersistentTopicInternalStats, error) {
return t.GetInternalStatsWithContext(context.Background(), topic)
}
func (t *topics) GetInternalStatsWithContext(
ctx context.Context,
topic utils.TopicName,
) (utils.PersistentTopicInternalStats, error) {
var stats utils.PersistentTopicInternalStats
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "internalStats")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &stats)
return stats, err
}
func (t *topics) GetPartitionedStats(topic utils.TopicName, perPartition bool) (utils.PartitionedTopicStats, error) {
return t.GetPartitionedStatsWithContext(context.Background(), topic, perPartition)
}
func (t *topics) GetPartitionedStatsWithContext(
ctx context.Context,
topic utils.TopicName,
perPartition bool,
) (utils.PartitionedTopicStats, error) {
var stats utils.PartitionedTopicStats
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "partitioned-stats")
params := map[string]string{
"perPartition": strconv.FormatBool(perPartition),
}
_, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint, &stats, params, true)
return stats, err
}
func (t *topics) GetPartitionedStatsWithOption(topic utils.TopicName, perPartition bool,
option utils.GetStatsOptions) (utils.PartitionedTopicStats, error) {
return t.GetPartitionedStatsWithOptionWithContext(context.Background(), topic, perPartition, option)
}
func (t *topics) GetPartitionedStatsWithOptionWithContext(ctx context.Context, topic utils.TopicName, perPartition bool,
option utils.GetStatsOptions) (utils.PartitionedTopicStats, error) {
var stats utils.PartitionedTopicStats
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "partitioned-stats")
params := map[string]string{
"perPartition": strconv.FormatBool(perPartition),
"getPreciseBacklog": strconv.FormatBool(option.GetPreciseBacklog),
"subscriptionBacklogSize": strconv.FormatBool(option.SubscriptionBacklogSize),
"getEarliestTimeInBacklog": strconv.FormatBool(option.GetEarliestTimeInBacklog),
"excludePublishers": strconv.FormatBool(option.ExcludePublishers),
"excludeConsumers": strconv.FormatBool(option.ExcludeConsumers),
}
_, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint, &stats, params, true)
return stats, err
}
func (t *topics) Terminate(topic utils.TopicName) (utils.MessageID, error) {
return t.TerminateWithContext(context.Background(), topic)
}
func (t *topics) TerminateWithContext(ctx context.Context, topic utils.TopicName) (utils.MessageID, error) {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "terminate")
var messageID utils.MessageID
err := t.pulsar.Client.PostWithObjWithContext(ctx, endpoint, nil, &messageID)
return messageID, err
}
func (t *topics) Offload(topic utils.TopicName, messageID utils.MessageID) error {
return t.OffloadWithContext(context.Background(), topic, messageID)
}
func (t *topics) OffloadWithContext(ctx context.Context, topic utils.TopicName, messageID utils.MessageID) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offload")
return t.pulsar.Client.PutWithContext(ctx, endpoint, messageID)
}
func (t *topics) OffloadStatus(topic utils.TopicName) (utils.OffloadProcessStatus, error) {
return t.OffloadStatusWithContext(context.Background(), topic)
}
func (t *topics) OffloadStatusWithContext(
ctx context.Context,
topic utils.TopicName,
) (utils.OffloadProcessStatus, error) {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offload")
var status utils.OffloadProcessStatus
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &status)
return status, err
}
func (t *topics) Unload(topic utils.TopicName) error {
return t.UnloadWithContext(context.Background(), topic)
}
func (t *topics) UnloadWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "unload")
return t.pulsar.Client.PutWithContext(ctx, endpoint, nil)
}
func (t *topics) Compact(topic utils.TopicName) error {
return t.CompactWithContext(context.Background(), topic)
}
func (t *topics) CompactWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "compaction")
return t.pulsar.Client.PutWithContext(ctx, endpoint, nil)
}
func (t *topics) CompactStatus(topic utils.TopicName) (utils.LongRunningProcessStatus, error) {
return t.CompactStatusWithContext(context.Background(), topic)
}
func (t *topics) CompactStatusWithContext(
ctx context.Context,
topic utils.TopicName,
) (utils.LongRunningProcessStatus, error) {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "compaction")
var status utils.LongRunningProcessStatus
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &status)
return status, err
}
func (t *topics) GetMessageTTL(topic utils.TopicName) (int, error) {
return t.GetMessageTTLWithContext(context.Background(), topic)
}
func (t *topics) GetMessageTTLWithContext(ctx context.Context, topic utils.TopicName) (int, error) {
var ttl = -1
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "messageTTL")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &ttl)
return ttl, err
}
func (t *topics) SetMessageTTL(topic utils.TopicName, messageTTL int) error {
return t.SetMessageTTLWithContext(context.Background(), topic, messageTTL)
}
func (t *topics) SetMessageTTLWithContext(ctx context.Context, topic utils.TopicName, messageTTL int) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "messageTTL")
var params = make(map[string]string)
params["messageTTL"] = strconv.Itoa(messageTTL)
err := t.pulsar.Client.PostWithQueryParamsWithContext(ctx, endpoint, nil, params)
return err
}
func (t *topics) RemoveMessageTTL(topic utils.TopicName) error {
return t.RemoveMessageTTLWithContext(context.Background(), topic)
}
func (t *topics) RemoveMessageTTLWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "messageTTL")
var params = make(map[string]string)
params["messageTTL"] = strconv.Itoa(0)
err := t.pulsar.Client.DeleteWithQueryParamsWithContext(ctx, endpoint, params)
return err
}
func (t *topics) GetMaxProducers(topic utils.TopicName) (int, error) {
return t.GetMaxProducersWithContext(context.Background(), topic)
}
func (t *topics) GetMaxProducersWithContext(ctx context.Context, topic utils.TopicName) (int, error) {
var maxProducers = -1
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxProducers")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxProducers)
return maxProducers, err
}
func (t *topics) SetMaxProducers(topic utils.TopicName, maxProducers int) error {
return t.SetMaxProducersWithContext(context.Background(), topic, maxProducers)
}
func (t *topics) SetMaxProducersWithContext(ctx context.Context, topic utils.TopicName, maxProducers int) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxProducers")
err := t.pulsar.Client.PostWithContext(ctx, endpoint, &maxProducers)
return err
}
func (t *topics) RemoveMaxProducers(topic utils.TopicName) error {
return t.RemoveMaxProducersWithContext(context.Background(), topic)
}
func (t *topics) RemoveMaxProducersWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxProducers")
err := t.pulsar.Client.DeleteWithContext(ctx, endpoint)
return err
}
func (t *topics) GetMaxConsumers(topic utils.TopicName) (int, error) {
return t.GetMaxConsumersWithContext(context.Background(), topic)
}
func (t *topics) GetMaxConsumersWithContext(ctx context.Context, topic utils.TopicName) (int, error) {
var maxConsumers = -1
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxConsumers")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxConsumers)
return maxConsumers, err
}
func (t *topics) SetMaxConsumers(topic utils.TopicName, maxConsumers int) error {
return t.SetMaxConsumersWithContext(context.Background(), topic, maxConsumers)
}
func (t *topics) SetMaxConsumersWithContext(ctx context.Context, topic utils.TopicName, maxConsumers int) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxConsumers")
err := t.pulsar.Client.PostWithContext(ctx, endpoint, &maxConsumers)
return err
}
func (t *topics) RemoveMaxConsumers(topic utils.TopicName) error {
return t.RemoveMaxConsumersWithContext(context.Background(), topic)
}
func (t *topics) RemoveMaxConsumersWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxConsumers")
err := t.pulsar.Client.DeleteWithContext(ctx, endpoint)
return err
}
func (t *topics) GetMaxUnackMessagesPerConsumer(topic utils.TopicName) (int, error) {
return t.GetMaxUnackMessagesPerConsumerWithContext(context.Background(), topic)
}
func (t *topics) GetMaxUnackMessagesPerConsumerWithContext(ctx context.Context, topic utils.TopicName) (int, error) {
var maxNum = -1
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxUnackedMessagesOnConsumer")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxNum)
return maxNum, err
}
func (t *topics) SetMaxUnackMessagesPerConsumer(topic utils.TopicName, maxUnackedNum int) error {
return t.SetMaxUnackMessagesPerConsumerWithContext(context.Background(), topic, maxUnackedNum)
}
func (t *topics) SetMaxUnackMessagesPerConsumerWithContext(
ctx context.Context,
topic utils.TopicName,
maxUnackedNum int,
) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxUnackedMessagesOnConsumer")
return t.pulsar.Client.PostWithContext(ctx, endpoint, &maxUnackedNum)
}
func (t *topics) RemoveMaxUnackMessagesPerConsumer(topic utils.TopicName) error {
return t.RemoveMaxUnackMessagesPerConsumerWithContext(context.Background(), topic)
}
func (t *topics) RemoveMaxUnackMessagesPerConsumerWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxUnackedMessagesOnConsumer")
return t.pulsar.Client.DeleteWithContext(ctx, endpoint)
}
func (t *topics) GetMaxUnackMessagesPerSubscription(topic utils.TopicName) (int, error) {
return t.GetMaxUnackMessagesPerSubscriptionWithContext(context.Background(), topic)
}
func (t *topics) GetMaxUnackMessagesPerSubscriptionWithContext(
ctx context.Context,
topic utils.TopicName,
) (int, error) {
var maxNum = -1
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxUnackedMessagesOnSubscription")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxNum)
return maxNum, err
}
func (t *topics) SetMaxUnackMessagesPerSubscription(topic utils.TopicName, maxUnackedNum int) error {
return t.SetMaxUnackMessagesPerSubscriptionWithContext(context.Background(), topic, maxUnackedNum)
}
func (t *topics) SetMaxUnackMessagesPerSubscriptionWithContext(
ctx context.Context,
topic utils.TopicName,
maxUnackedNum int,
) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxUnackedMessagesOnSubscription")
return t.pulsar.Client.PostWithContext(ctx, endpoint, &maxUnackedNum)
}
func (t *topics) RemoveMaxUnackMessagesPerSubscription(topic utils.TopicName) error {
return t.RemoveMaxUnackMessagesPerSubscriptionWithContext(context.Background(), topic)
}
func (t *topics) RemoveMaxUnackMessagesPerSubscriptionWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxUnackedMessagesOnSubscription")
return t.pulsar.Client.DeleteWithContext(ctx, endpoint)
}
func (t *topics) GetPersistence(topic utils.TopicName) (*utils.PersistenceData, error) {
return t.GetPersistenceWithContext(context.Background(), topic)
}
func (t *topics) GetPersistenceWithContext(ctx context.Context, topic utils.TopicName) (*utils.PersistenceData, error) {
var persistenceData utils.PersistenceData
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "persistence")
body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, &persistenceData)
if body != nil {
return &persistenceData, err
}
return nil, err
}
func (t *topics) SetPersistence(topic utils.TopicName, persistenceData utils.PersistenceData) error {
return t.SetPersistenceWithContext(context.Background(), topic, persistenceData)
}
func (t *topics) SetPersistenceWithContext(
ctx context.Context,
topic utils.TopicName,
persistenceData utils.PersistenceData,
) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "persistence")
return t.pulsar.Client.PostWithContext(ctx, endpoint, &persistenceData)
}
func (t *topics) RemovePersistence(topic utils.TopicName) error {
return t.RemovePersistenceWithContext(context.Background(), topic)
}
func (t *topics) RemovePersistenceWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "persistence")
return t.pulsar.Client.DeleteWithContext(ctx, endpoint)
}
func (t *topics) GetDelayedDelivery(topic utils.TopicName) (*utils.DelayedDeliveryData, error) {
return t.GetDelayedDeliveryWithContext(context.Background(), topic)
}
func (t *topics) GetDelayedDeliveryWithContext(
ctx context.Context,
topic utils.TopicName,
) (*utils.DelayedDeliveryData, error) {
var delayedDeliveryData utils.DelayedDeliveryData
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "delayedDelivery")
body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, &delayedDeliveryData)
if body != nil {
return &delayedDeliveryData, err
}
return nil, err
}
func (t *topics) SetDelayedDelivery(topic utils.TopicName, delayedDeliveryData utils.DelayedDeliveryData) error {
return t.SetDelayedDeliveryWithContext(context.Background(), topic, delayedDeliveryData)
}
func (t *topics) SetDelayedDeliveryWithContext(
ctx context.Context,
topic utils.TopicName,
delayedDeliveryData utils.DelayedDeliveryData,
) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "delayedDelivery")
return t.pulsar.Client.PostWithContext(ctx, endpoint, &delayedDeliveryData)
}
func (t *topics) RemoveDelayedDelivery(topic utils.TopicName) error {
return t.RemoveDelayedDeliveryWithContext(context.Background(), topic)
}
func (t *topics) RemoveDelayedDeliveryWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "delayedDelivery")
return t.pulsar.Client.DeleteWithContext(ctx, endpoint)
}
func (t *topics) GetDispatchRate(topic utils.TopicName) (*utils.DispatchRateData, error) {
return t.GetDispatchRateWithContext(context.Background(), topic)
}
func (t *topics) GetDispatchRateWithContext(
ctx context.Context,
topic utils.TopicName,
) (*utils.DispatchRateData, error) {
var dispatchRateData utils.DispatchRateData
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "dispatchRate")
body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, &dispatchRateData)
if body != nil {
return &dispatchRateData, err
}
return nil, err
}
func (t *topics) SetDispatchRate(topic utils.TopicName, dispatchRateData utils.DispatchRateData) error {
return t.SetDispatchRateWithContext(context.Background(), topic, dispatchRateData)
}
func (t *topics) SetDispatchRateWithContext(
ctx context.Context,
topic utils.TopicName,
dispatchRateData utils.DispatchRateData,
) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "dispatchRate")
return t.pulsar.Client.PostWithContext(ctx, endpoint, &dispatchRateData)
}
func (t *topics) RemoveDispatchRate(topic utils.TopicName) error {
return t.RemoveDispatchRateWithContext(context.Background(), topic)
}
func (t *topics) RemoveDispatchRateWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "dispatchRate")
return t.pulsar.Client.DeleteWithContext(ctx, endpoint)
}
func (t *topics) GetPublishRate(topic utils.TopicName) (*utils.PublishRateData, error) {
return t.GetPublishRateWithContext(context.Background(), topic)
}
func (t *topics) GetPublishRateWithContext(ctx context.Context, topic utils.TopicName) (*utils.PublishRateData, error) {
var publishRateData utils.PublishRateData
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "publishRate")
body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, &publishRateData)
if body != nil {
return &publishRateData, err
}
return nil, err
}
func (t *topics) SetPublishRate(topic utils.TopicName, publishRateData utils.PublishRateData) error {
return t.SetPublishRateWithContext(context.Background(), topic, publishRateData)
}
func (t *topics) SetPublishRateWithContext(
ctx context.Context,
topic utils.TopicName,
publishRateData utils.PublishRateData,
) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "publishRate")
return t.pulsar.Client.PostWithContext(ctx, endpoint, &publishRateData)
}
func (t *topics) RemovePublishRate(topic utils.TopicName) error {
return t.RemovePublishRateWithContext(context.Background(), topic)
}
func (t *topics) RemovePublishRateWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "publishRate")
return t.pulsar.Client.DeleteWithContext(ctx, endpoint)
}
func (t *topics) GetDeduplicationStatus(topic utils.TopicName) (bool, error) {
return t.GetDeduplicationStatusWithContext(context.Background(), topic)
}
func (t *topics) GetDeduplicationStatusWithContext(ctx context.Context, topic utils.TopicName) (bool, error) {
var enabled bool
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationEnabled")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &enabled)
return enabled, err
}
func (t *topics) SetDeduplicationStatus(topic utils.TopicName, enabled bool) error {
return t.SetDeduplicationStatusWithContext(context.Background(), topic, enabled)
}
func (t *topics) SetDeduplicationStatusWithContext(ctx context.Context, topic utils.TopicName, enabled bool) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationEnabled")
return t.pulsar.Client.PostWithContext(ctx, endpoint, enabled)
}
func (t *topics) RemoveDeduplicationStatus(topic utils.TopicName) error {
return t.RemoveDeduplicationStatusWithContext(context.Background(), topic)
}
func (t *topics) RemoveDeduplicationStatusWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationEnabled")
return t.pulsar.Client.DeleteWithContext(ctx, endpoint)
}
func (t *topics) GetRetention(topic utils.TopicName, applied bool) (*utils.RetentionPolicies, error) {
return t.GetRetentionWithContext(context.Background(), topic, applied)
}
func (t *topics) GetRetentionWithContext(
ctx context.Context,
topic utils.TopicName,
applied bool,
) (*utils.RetentionPolicies, error) {
var policy utils.RetentionPolicies
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "retention")
body, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint, &policy, map[string]string{
"applied": strconv.FormatBool(applied),
}, true)
if body != nil {
return &policy, err
}
return nil, err
}
func (t *topics) RemoveRetention(topic utils.TopicName) error {
return t.RemoveRetentionWithContext(context.Background(), topic)
}
func (t *topics) RemoveRetentionWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "retention")
return t.pulsar.Client.DeleteWithContext(ctx, endpoint)
}
func (t *topics) SetRetention(topic utils.TopicName, data utils.RetentionPolicies) error {
return t.SetRetentionWithContext(context.Background(), topic, data)
}
func (t *topics) SetRetentionWithContext(
ctx context.Context,
topic utils.TopicName,
data utils.RetentionPolicies,
) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "retention")
return t.pulsar.Client.PostWithContext(ctx, endpoint, data)
}
func (t *topics) GetCompactionThreshold(topic utils.TopicName, applied bool) (int64, error) {
return t.GetCompactionThresholdWithContext(context.Background(), topic, applied)
}
func (t *topics) GetCompactionThresholdWithContext(
ctx context.Context,
topic utils.TopicName,
applied bool,
) (int64, error) {
var threshold int64 = -1
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "compactionThreshold")
_, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint, &threshold, map[string]string{
"applied": strconv.FormatBool(applied),
}, true)
return threshold, err
}
func (t *topics) SetCompactionThreshold(topic utils.TopicName, threshold int64) error {
return t.SetCompactionThresholdWithContext(context.Background(), topic, threshold)
}
func (t *topics) SetCompactionThresholdWithContext(ctx context.Context, topic utils.TopicName, threshold int64) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "compactionThreshold")
err := t.pulsar.Client.PostWithContext(ctx, endpoint, threshold)
return err
}
func (t *topics) RemoveCompactionThreshold(topic utils.TopicName) error {
return t.RemoveCompactionThresholdWithContext(context.Background(), topic)
}
func (t *topics) RemoveCompactionThresholdWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "compactionThreshold")
err := t.pulsar.Client.DeleteWithContext(ctx, endpoint)
return err
}
func (t *topics) GetBacklogQuotaMap(topic utils.TopicName, applied bool) (map[utils.BacklogQuotaType]utils.BacklogQuota,
error) {
return t.GetBacklogQuotaMapWithContext(context.Background(), topic, applied)
}
func (t *topics) GetBacklogQuotaMapWithContext(
ctx context.Context,
topic utils.TopicName,
applied bool,
) (map[utils.BacklogQuotaType]utils.BacklogQuota,
error) {
var backlogQuotaMap map[utils.BacklogQuotaType]utils.BacklogQuota
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "backlogQuotaMap")
queryParams := map[string]string{"applied": strconv.FormatBool(applied)}
_, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint, &backlogQuotaMap, queryParams, true)
return backlogQuotaMap, err
}
func (t *topics) SetBacklogQuota(topic utils.TopicName, backlogQuota utils.BacklogQuota,
backlogQuotaType utils.BacklogQuotaType) error {
return t.SetBacklogQuotaWithContext(context.Background(), topic, backlogQuota, backlogQuotaType)
}
func (t *topics) SetBacklogQuotaWithContext(ctx context.Context, topic utils.TopicName, backlogQuota utils.BacklogQuota,
backlogQuotaType utils.BacklogQuotaType) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "backlogQuota")
params := make(map[string]string)
params["backlogQuotaType"] = string(backlogQuotaType)
return t.pulsar.Client.PostWithQueryParamsWithContext(ctx, endpoint, &backlogQuota, params)
}
func (t *topics) RemoveBacklogQuota(topic utils.TopicName, backlogQuotaType utils.BacklogQuotaType) error {
return t.RemoveBacklogQuotaWithContext(context.Background(), topic, backlogQuotaType)
}
func (t *topics) RemoveBacklogQuotaWithContext(
ctx context.Context,
topic utils.TopicName,
backlogQuotaType utils.BacklogQuotaType,
) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "backlogQuota")
return t.pulsar.Client.DeleteWithQueryParamsWithContext(ctx, endpoint, map[string]string{
"backlogQuotaType": string(backlogQuotaType),
})
}
func (t *topics) GetInactiveTopicPolicies(topic utils.TopicName, applied bool) (utils.InactiveTopicPolicies, error) {
return t.GetInactiveTopicPoliciesWithContext(context.Background(), topic, applied)
}
func (t *topics) GetInactiveTopicPoliciesWithContext(
ctx context.Context,
topic utils.TopicName,
applied bool,
) (utils.InactiveTopicPolicies, error) {
var out utils.InactiveTopicPolicies
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "inactiveTopicPolicies")
_, err := t.pulsar.Client.GetWithQueryParamsWithContext(ctx, endpoint, &out, map[string]string{
"applied": strconv.FormatBool(applied),
}, true)
return out, err
}
func (t *topics) RemoveInactiveTopicPolicies(topic utils.TopicName) error {
return t.RemoveInactiveTopicPoliciesWithContext(context.Background(), topic)
}
func (t *topics) RemoveInactiveTopicPoliciesWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "inactiveTopicPolicies")
return t.pulsar.Client.DeleteWithContext(ctx, endpoint)
}
func (t *topics) SetInactiveTopicPolicies(topic utils.TopicName, data utils.InactiveTopicPolicies) error {
return t.SetInactiveTopicPoliciesWithContext(context.Background(), topic, data)
}
func (t *topics) SetInactiveTopicPoliciesWithContext(
ctx context.Context,
topic utils.TopicName,
data utils.InactiveTopicPolicies,
) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "inactiveTopicPolicies")
return t.pulsar.Client.PostWithContext(ctx, endpoint, data)
}
func (t *topics) SetReplicationClusters(topic utils.TopicName, data []string) error {
return t.SetReplicationClustersWithContext(context.Background(), topic, data)
}
func (t *topics) SetReplicationClustersWithContext(ctx context.Context, topic utils.TopicName, data []string) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "replication")
return t.pulsar.Client.PostWithContext(ctx, endpoint, data)
}
func (t *topics) GetReplicationClusters(topic utils.TopicName) ([]string, error) {
return t.GetReplicationClustersWithContext(context.Background(), topic)
}
func (t *topics) GetReplicationClustersWithContext(ctx context.Context, topic utils.TopicName) ([]string, error) {
var data []string
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "replication")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &data)
return data, err
}
func (t *topics) GetSubscribeRate(topic utils.TopicName) (*utils.SubscribeRate, error) {
return t.GetSubscribeRateWithContext(context.Background(), topic)
}
func (t *topics) GetSubscribeRateWithContext(ctx context.Context, topic utils.TopicName) (*utils.SubscribeRate, error) {
var subscribeRate utils.SubscribeRate
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscribeRate")
body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, &subscribeRate)
if body != nil {
return &subscribeRate, err
}
return nil, err
}
func (t *topics) SetSubscribeRate(topic utils.TopicName, subscribeRate utils.SubscribeRate) error {
return t.SetSubscribeRateWithContext(context.Background(), topic, subscribeRate)
}
func (t *topics) SetSubscribeRateWithContext(
ctx context.Context,
topic utils.TopicName,
subscribeRate utils.SubscribeRate,
) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscribeRate")
return t.pulsar.Client.PostWithContext(ctx, endpoint, &subscribeRate)
}
func (t *topics) RemoveSubscribeRate(topic utils.TopicName) error {
return t.RemoveSubscribeRateWithContext(context.Background(), topic)
}
func (t *topics) RemoveSubscribeRateWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscribeRate")
return t.pulsar.Client.DeleteWithContext(ctx, endpoint)
}
func (t *topics) GetSubscriptionDispatchRate(topic utils.TopicName) (*utils.DispatchRateData, error) {
return t.GetSubscriptionDispatchRateWithContext(context.Background(), topic)
}
func (t *topics) GetSubscriptionDispatchRateWithContext(
ctx context.Context,
topic utils.TopicName,
) (*utils.DispatchRateData, error) {
var dispatchRate utils.DispatchRateData
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscriptionDispatchRate")
body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, &dispatchRate)
if body != nil {
return &dispatchRate, err
}
return nil, err
}
func (t *topics) SetSubscriptionDispatchRate(topic utils.TopicName, dispatchRate utils.DispatchRateData) error {
return t.SetSubscriptionDispatchRateWithContext(context.Background(), topic, dispatchRate)
}
func (t *topics) SetSubscriptionDispatchRateWithContext(
ctx context.Context,
topic utils.TopicName,
dispatchRate utils.DispatchRateData,
) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscriptionDispatchRate")
return t.pulsar.Client.PostWithContext(ctx, endpoint, &dispatchRate)
}
func (t *topics) RemoveSubscriptionDispatchRate(topic utils.TopicName) error {
return t.RemoveSubscriptionDispatchRateWithContext(context.Background(), topic)
}
func (t *topics) RemoveSubscriptionDispatchRateWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "subscriptionDispatchRate")
return t.pulsar.Client.DeleteWithContext(ctx, endpoint)
}
func (t *topics) GetMaxConsumersPerSubscription(topic utils.TopicName) (int, error) {
return t.GetMaxConsumersPerSubscriptionWithContext(context.Background(), topic)
}
func (t *topics) GetMaxConsumersPerSubscriptionWithContext(ctx context.Context, topic utils.TopicName) (int, error) {
var maxConsumers = -1
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxConsumersPerSubscription")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxConsumers)
return maxConsumers, err
}
func (t *topics) SetMaxConsumersPerSubscription(topic utils.TopicName, maxConsumers int) error {
return t.SetMaxConsumersPerSubscriptionWithContext(context.Background(), topic, maxConsumers)
}
func (t *topics) SetMaxConsumersPerSubscriptionWithContext(
ctx context.Context,
topic utils.TopicName,
maxConsumers int,
) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxConsumersPerSubscription")
return t.pulsar.Client.PostWithContext(ctx, endpoint, &maxConsumers)
}
func (t *topics) RemoveMaxConsumersPerSubscription(topic utils.TopicName) error {
return t.RemoveMaxConsumersPerSubscriptionWithContext(context.Background(), topic)
}
func (t *topics) RemoveMaxConsumersPerSubscriptionWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxConsumersPerSubscription")
return t.pulsar.Client.DeleteWithContext(ctx, endpoint)
}
func (t *topics) GetMaxMessageSize(topic utils.TopicName) (int, error) {
return t.GetMaxMessageSizeWithContext(context.Background(), topic)
}
func (t *topics) GetMaxMessageSizeWithContext(ctx context.Context, topic utils.TopicName) (int, error) {
var maxMessageSize = -1
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxMessageSize")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxMessageSize)
return maxMessageSize, err
}
func (t *topics) SetMaxMessageSize(topic utils.TopicName, maxMessageSize int) error {
return t.SetMaxMessageSizeWithContext(context.Background(), topic, maxMessageSize)
}
func (t *topics) SetMaxMessageSizeWithContext(ctx context.Context, topic utils.TopicName, maxMessageSize int) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxMessageSize")
return t.pulsar.Client.PostWithContext(ctx, endpoint, &maxMessageSize)
}
func (t *topics) RemoveMaxMessageSize(topic utils.TopicName) error {
return t.RemoveMaxMessageSizeWithContext(context.Background(), topic)
}
func (t *topics) RemoveMaxMessageSizeWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxMessageSize")
return t.pulsar.Client.DeleteWithContext(ctx, endpoint)
}
func (t *topics) GetMaxSubscriptionsPerTopic(topic utils.TopicName) (int, error) {
return t.GetMaxSubscriptionsPerTopicWithContext(context.Background(), topic)
}
func (t *topics) GetMaxSubscriptionsPerTopicWithContext(ctx context.Context, topic utils.TopicName) (int, error) {
var maxSubscriptions = -1
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxSubscriptionsPerTopic")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &maxSubscriptions)
return maxSubscriptions, err
}
func (t *topics) SetMaxSubscriptionsPerTopic(topic utils.TopicName, maxSubscriptions int) error {
return t.SetMaxSubscriptionsPerTopicWithContext(context.Background(), topic, maxSubscriptions)
}
func (t *topics) SetMaxSubscriptionsPerTopicWithContext(
ctx context.Context,
topic utils.TopicName,
maxSubscriptions int,
) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxSubscriptionsPerTopic")
return t.pulsar.Client.PostWithContext(ctx, endpoint, &maxSubscriptions)
}
func (t *topics) RemoveMaxSubscriptionsPerTopic(topic utils.TopicName) error {
return t.RemoveMaxSubscriptionsPerTopicWithContext(context.Background(), topic)
}
func (t *topics) RemoveMaxSubscriptionsPerTopicWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "maxSubscriptionsPerTopic")
return t.pulsar.Client.DeleteWithContext(ctx, endpoint)
}
func (t *topics) GetSchemaValidationEnforced(topic utils.TopicName) (bool, error) {
return t.GetSchemaValidationEnforcedWithContext(context.Background(), topic)
}
func (t *topics) GetSchemaValidationEnforcedWithContext(ctx context.Context, topic utils.TopicName) (bool, error) {
var enabled bool
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaValidationEnforced")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &enabled)
return enabled, err
}
func (t *topics) SetSchemaValidationEnforced(topic utils.TopicName, enabled bool) error {
return t.SetSchemaValidationEnforcedWithContext(context.Background(), topic, enabled)
}
func (t *topics) SetSchemaValidationEnforcedWithContext(
ctx context.Context,
topic utils.TopicName,
enabled bool,
) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaValidationEnforced")
return t.pulsar.Client.PostWithContext(ctx, endpoint, enabled)
}
func (t *topics) RemoveSchemaValidationEnforced(topic utils.TopicName) error {
return t.RemoveSchemaValidationEnforcedWithContext(context.Background(), topic)
}
func (t *topics) RemoveSchemaValidationEnforcedWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaValidationEnforced")
return t.pulsar.Client.DeleteWithContext(ctx, endpoint)
}
func (t *topics) GetDeduplicationSnapshotInterval(topic utils.TopicName) (int, error) {
return t.GetDeduplicationSnapshotIntervalWithContext(context.Background(), topic)
}
func (t *topics) GetDeduplicationSnapshotIntervalWithContext(ctx context.Context, topic utils.TopicName) (int, error) {
var interval = -1
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationSnapshotInterval")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &interval)
return interval, err
}
func (t *topics) SetDeduplicationSnapshotInterval(topic utils.TopicName, interval int) error {
return t.SetDeduplicationSnapshotIntervalWithContext(context.Background(), topic, interval)
}
func (t *topics) SetDeduplicationSnapshotIntervalWithContext(
ctx context.Context,
topic utils.TopicName,
interval int,
) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationSnapshotInterval")
return t.pulsar.Client.PostWithContext(ctx, endpoint, &interval)
}
func (t *topics) RemoveDeduplicationSnapshotInterval(topic utils.TopicName) error {
return t.RemoveDeduplicationSnapshotIntervalWithContext(context.Background(), topic)
}
func (t *topics) RemoveDeduplicationSnapshotIntervalWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "deduplicationSnapshotInterval")
return t.pulsar.Client.DeleteWithContext(ctx, endpoint)
}
func (t *topics) GetReplicatorDispatchRate(topic utils.TopicName) (*utils.DispatchRateData, error) {
return t.GetReplicatorDispatchRateWithContext(context.Background(), topic)
}
func (t *topics) GetReplicatorDispatchRateWithContext(
ctx context.Context,
topic utils.TopicName,
) (*utils.DispatchRateData, error) {
var dispatchRate utils.DispatchRateData
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "replicatorDispatchRate")
body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, &dispatchRate)
if body != nil {
return &dispatchRate, err
}
return nil, err
}
func (t *topics) SetReplicatorDispatchRate(topic utils.TopicName, dispatchRate utils.DispatchRateData) error {
return t.SetReplicatorDispatchRateWithContext(context.Background(), topic, dispatchRate)
}
func (t *topics) SetReplicatorDispatchRateWithContext(
ctx context.Context,
topic utils.TopicName,
dispatchRate utils.DispatchRateData,
) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "replicatorDispatchRate")
return t.pulsar.Client.PostWithContext(ctx, endpoint, &dispatchRate)
}
func (t *topics) RemoveReplicatorDispatchRate(topic utils.TopicName) error {
return t.RemoveReplicatorDispatchRateWithContext(context.Background(), topic)
}
func (t *topics) RemoveReplicatorDispatchRateWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "replicatorDispatchRate")
return t.pulsar.Client.DeleteWithContext(ctx, endpoint)
}
func (t *topics) GetAutoSubscriptionCreation(topic utils.TopicName) (*utils.AutoSubscriptionCreationOverride, error) {
return t.GetAutoSubscriptionCreationWithContext(context.Background(), topic)
}
func (t *topics) GetAutoSubscriptionCreationWithContext(
ctx context.Context,
topic utils.TopicName,
) (*utils.AutoSubscriptionCreationOverride, error) {
var autoSubCreation utils.AutoSubscriptionCreationOverride
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "autoSubscriptionCreation")
body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, &autoSubCreation)
if body != nil {
return &autoSubCreation, err
}
return nil, err
}
func (t *topics) SetAutoSubscriptionCreation(topic utils.TopicName,
autoSubCreation utils.AutoSubscriptionCreationOverride) error {
return t.SetAutoSubscriptionCreationWithContext(context.Background(), topic, autoSubCreation)
}
func (t *topics) SetAutoSubscriptionCreationWithContext(ctx context.Context, topic utils.TopicName,
autoSubCreation utils.AutoSubscriptionCreationOverride) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "autoSubscriptionCreation")
return t.pulsar.Client.PostWithContext(ctx, endpoint, &autoSubCreation)
}
func (t *topics) RemoveAutoSubscriptionCreation(topic utils.TopicName) error {
return t.RemoveAutoSubscriptionCreationWithContext(context.Background(), topic)
}
func (t *topics) RemoveAutoSubscriptionCreationWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "autoSubscriptionCreation")
return t.pulsar.Client.DeleteWithContext(ctx, endpoint)
}
func (t *topics) GetSchemaCompatibilityStrategy(topic utils.TopicName) (utils.SchemaCompatibilityStrategy, error) {
return t.GetSchemaCompatibilityStrategyWithContext(context.Background(), topic)
}
func (t *topics) GetSchemaCompatibilityStrategyWithContext(
ctx context.Context,
topic utils.TopicName,
) (utils.SchemaCompatibilityStrategy, error) {
var strategy utils.SchemaCompatibilityStrategy
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaCompatibilityStrategy")
err := t.pulsar.Client.GetWithContext(ctx, endpoint, &strategy)
return strategy, err
}
func (t *topics) SetSchemaCompatibilityStrategy(topic utils.TopicName,
strategy utils.SchemaCompatibilityStrategy) error {
return t.SetSchemaCompatibilityStrategyWithContext(context.Background(), topic, strategy)
}
func (t *topics) SetSchemaCompatibilityStrategyWithContext(ctx context.Context, topic utils.TopicName,
strategy utils.SchemaCompatibilityStrategy) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaCompatibilityStrategy")
return t.pulsar.Client.PutWithContext(ctx, endpoint, strategy)
}
func (t *topics) RemoveSchemaCompatibilityStrategy(topic utils.TopicName) error {
return t.RemoveSchemaCompatibilityStrategyWithContext(context.Background(), topic)
}
func (t *topics) RemoveSchemaCompatibilityStrategyWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "schemaCompatibilityStrategy")
return t.pulsar.Client.DeleteWithContext(ctx, endpoint)
}
func (t *topics) GetOffloadPolicies(topic utils.TopicName) (*utils.OffloadPolicies, error) {
return t.GetOffloadPoliciesWithContext(context.Background(), topic)
}
func (t *topics) GetOffloadPoliciesWithContext(
ctx context.Context,
topic utils.TopicName,
) (*utils.OffloadPolicies, error) {
var offloadPolicies utils.OffloadPolicies
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offloadPolicies")
body, err := t.pulsar.Client.GetBodyWithContext(ctx, endpoint, &offloadPolicies)
if body != nil {
return &offloadPolicies, err
}
return nil, err
}
func (t *topics) SetOffloadPolicies(topic utils.TopicName, offloadPolicies utils.OffloadPolicies) error {
return t.SetOffloadPoliciesWithContext(context.Background(), topic, offloadPolicies)
}
func (t *topics) SetOffloadPoliciesWithContext(
ctx context.Context,
topic utils.TopicName,
offloadPolicies utils.OffloadPolicies,
) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offloadPolicies")
return t.pulsar.Client.PostWithContext(ctx, endpoint, &offloadPolicies)
}
func (t *topics) RemoveOffloadPolicies(topic utils.TopicName) error {
return t.RemoveOffloadPoliciesWithContext(context.Background(), topic)
}
func (t *topics) RemoveOffloadPoliciesWithContext(ctx context.Context, topic utils.TopicName) error {
endpoint := t.pulsar.endpoint(t.basePath, topic.GetRestPath(), "offloadPolicies")
return t.pulsar.Client.DeleteWithContext(ctx, endpoint)
}