blob: 45b1b301d33839a3cf03aff5e615dac4f0a2b37b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin.v2;
import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import static org.apache.pulsar.common.util.Codec.decode;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Maps;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*/
@Path("/persistent")
@Produces(MediaType.APPLICATION_JSON)
@Api(value = "/persistent", description = "Persistent topic admin apis", tags = "persistent topic")
public class PersistentTopics extends PersistentTopicsBase {
@GET
@Path("/{tenant}/{namespace}")
@ApiOperation(value = "Get the list of topics under a namespace.", response = String.class, responseContainer = "List")
@ApiResponses(value = {
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
@ApiResponse(code = 412, message = "Namespace name is not valid"),
@ApiResponse(code = 500, message = "Internal server error") })
public void getList(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace) {
try {
validateNamespaceName(tenant, namespace);
asyncResponse.resume(internalGetList());
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@GET
@Path("/{tenant}/{namespace}/partitioned")
@ApiOperation(value = "Get the list of partitioned topics under a namespace.", response = String.class, responseContainer = "List")
@ApiResponses(value = {
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
@ApiResponse(code = 412, message = "Namespace name is not valid"),
@ApiResponse(code = 500, message = "Internal server error") })
public List<String> getPartitionedTopicList(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetPartitionedTopicList();
}
@GET
@Path("/{tenant}/{namespace}/{topic}/permissions")
@ApiOperation(value = "Get permissions on a topic.", notes = "Retrieve the effective permissions for a topic. These permissions are defined by the permissions set at the"
+ "namespace level combined (union) with any eventual specific permission set on the topic.")
@ApiResponses(value = {
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error") })
public Map<String, Set<AuthAction>> getPermissionsOnTopic(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
return internalGetPermissionsOnTopic();
}
@POST
@Path("/{tenant}/{namespace}/{topic}/permissions/{role}")
@ApiOperation(value = "Grant a new permission to a role on a single topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error") })
public void grantPermissionsOnTopic(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Client role to which grant permissions", required = true)
@PathParam("role") String role,
@ApiParam(value = "Actions to be granted (produce,functions,consume)", allowableValues = "produce,functions,consume")
Set<AuthAction> actions) {
validateTopicName(tenant, namespace, encodedTopic);
internalGrantPermissionsOnTopic(role, actions);
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/permissions/{role}")
@ApiOperation(value = "Revoke permissions on a topic.", notes = "Revoke permissions to a role on a single topic. If the permission was not set at the topic"
+ "level, but rather at the namespace level, this operation will return an error (HTTP status code 412).")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "tenant/namespace/topic doesn't exit"),
@ApiResponse(code = 412, message = "Permissions are not set at the topic level"),
@ApiResponse(code = 500, message = "Internal server error") })
public void revokePermissionsOnTopic(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Client role to which grant permissions", required = true)
@PathParam("role") String role) {
validateTopicName(tenant, namespace, encodedTopic);
internalRevokePermissionsOnTopic(role);
}
@PUT
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Create a partitioned topic.", notes = "It needs to be called before creating a producer on a partitioned topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic already exist"),
@ApiResponse(code = 412, message = "Failed Reason : Name is invalid or Namespace does not have any clusters configured"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
})
public void createPartitionedTopic(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "The number of partitions for the topic", required = true, type = "int", defaultValue = "0")
int numPartitions) {
try {
validateGlobalNamespaceOwnership(tenant,namespace);
validatePartitionedTopicName(tenant, namespace, encodedTopic);
validateAdminAccessForTenant(topicName.getTenant());
internalCreatePartitionedTopic(asyncResponse, numPartitions);
} catch (Exception e) {
log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}
@PUT
@Path("/{tenant}/{namespace}/{topic}")
@ApiOperation(value="Create a non-partitioned topic.", notes = "This is the only REST endpoint from which non-partitioned topics could be created.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 409, message = "Partitioned topic already exist"),
@ApiResponse(code = 412, message = "Failed Reason : Name is invalid or Namespace does not have any clusters configured"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
})
public void createNonPartitionedTopic(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateGlobalNamespaceOwnership(tenant,namespace);
validateTopicName(tenant, namespace, encodedTopic);
internalCreateNonPartitionedTopic(authoritative);
}
@GET
@Path("/{tenant}/{namespace}/{topic}/offloadPolicies")
@ApiOperation(value = "Get offload policies on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error"),})
public void getOffloadPolicies(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
if (topicPolicies.isOffloadPoliciesSet()) {
asyncResponse.resume(topicPolicies.getOffloadPolicies());
} else {
asyncResponse.resume(Response.noContent().build());
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/offloadPolicies")
@ApiOperation(value = "Set offload policies on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
public void setOffloadPolicies(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Offload policies for the specified topic")
OffloadPolicies offloadPolicies) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalSetOffloadPolicies(offloadPolicies).whenComplete((res, ex) -> {
if (ex instanceof RestException) {
log.error("Failed set offloadPolicies", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed set offloadPolicies", ex);
asyncResponse.resume(new RestException(ex));
} else {
asyncResponse.resume(Response.noContent().build());
}
});
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/offloadPolicies")
@ApiOperation(value = "Delete offload policies on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
public void removeOffloadPolicies(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
setOffloadPolicies(asyncResponse, tenant, namespace, encodedTopic, null);
}
@GET
@Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
@ApiOperation(value = "Get max unacked messages per consumer config on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error"),})
public void getMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
if (topicPolicies.isMaxUnackedMessagesOnConsumerSet()) {
asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnConsumer());
} else {
asyncResponse.resume(Response.noContent().build());
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
@ApiOperation(value = "Set max unacked messages per consumer config on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
public void setMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Max unacked messages on consumer policies for the specified topic")
Integer maxUnackedNum) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum).whenComplete((res, ex) -> {
if (ex instanceof RestException) {
log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed set MaxUnackedMessagesOnConsumer", ex);
asyncResponse.resume(new RestException(ex));
} else {
asyncResponse.resume(Response.noContent().build());
}
});
}
@GET
@Path("/{tenant}/{namespace}/{topic}/deduplicationSnapshotInterval")
@ApiOperation(value = "Get deduplicationSnapshotInterval config on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error"),})
public void getDeduplicationSnapshotInterval(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
if (topicPolicies.isDeduplicationSnapshotIntervalSecondsSet()) {
asyncResponse.resume(topicPolicies.getDeduplicationSnapshotIntervalSeconds());
} else {
asyncResponse.resume(Response.noContent().build());
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/deduplicationSnapshotInterval")
@ApiOperation(value = "Set deduplicationSnapshotInterval config on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
public void setDeduplicationSnapshotInterval(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Interval to take deduplication snapshot for the specified topic")
Integer interval) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
internalSetDeduplicationSnapshotInterval(interval).whenComplete((res, ex) -> {
if (ex instanceof RestException) {
log.error("Failed set deduplicationSnapshotInterval", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed set deduplicationSnapshotInterval", ex);
asyncResponse.resume(new RestException(ex));
} else {
asyncResponse.resume(Response.noContent().build());
}
});
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/deduplicationSnapshotInterval")
@ApiOperation(value = "Delete deduplicationSnapshotInterval config on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
public void deleteDeduplicationSnapshotInterval(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalSetDeduplicationSnapshotInterval(null).whenComplete((res, ex) -> {
if (ex instanceof RestException) {
log.error("Failed delete deduplicationSnapshotInterval", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed delete deduplicationSnapshotInterval", ex);
asyncResponse.resume(new RestException(ex));
} else {
asyncResponse.resume(Response.noContent().build());
}
});
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer")
@ApiOperation(value = "Delete max unacked messages per consumer config on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
public void deleteMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
setMaxUnackedMessagesOnConsumer(asyncResponse, tenant, namespace, encodedTopic, null);
}
@GET
@Path("/{tenant}/{namespace}/{topic}/inactiveTopicPolicies")
@ApiOperation(value = "Get inactive topic policies on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error"),})
public void getInactiveTopicPolicies(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
if (topicPolicies.isInactiveTopicPoliciesSet()) {
asyncResponse.resume(topicPolicies.getInactiveTopicPolicies());
} else {
asyncResponse.resume(Response.noContent().build());
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/inactiveTopicPolicies")
@ApiOperation(value = "Set inactive topic policies on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
public void setInactiveTopicPolicies(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "inactive topic policies for the specified topic")
InactiveTopicPolicies inactiveTopicPolicies) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalSetInactiveTopicPolicies(inactiveTopicPolicies).whenComplete((res, ex) -> {
if (ex instanceof RestException) {
log.error("Failed set InactiveTopicPolicies", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed set InactiveTopicPolicies", ex);
asyncResponse.resume(new RestException(ex));
} else {
asyncResponse.resume(Response.noContent().build());
}
});
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/inactiveTopicPolicies")
@ApiOperation(value = "Delete inactive topic policies on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
public void deleteInactiveTopicPolicies(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
setInactiveTopicPolicies(asyncResponse, tenant, namespace, encodedTopic, null);
}
@GET
@Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnSubscription")
@ApiOperation(value = "Get max unacked messages per subscription config on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error"),})
public void getMaxUnackedMessagesOnSubscription(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
if (topicPolicies.isMaxUnackedMessagesOnSubscriptionSet()) {
asyncResponse.resume(topicPolicies.getMaxUnackedMessagesOnSubscription());
} else {
asyncResponse.resume(Response.noContent().build());
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnSubscription")
@ApiOperation(value = "Set max unacked messages per subscription config on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
public void setMaxUnackedMessagesOnSubscription(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Max unacked messages on subscription policies for the specified topic")
Integer maxUnackedNum) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalSetMaxUnackedMessagesOnSubscription(maxUnackedNum).whenComplete((res, ex) -> {
if (ex instanceof RestException) {
log.error("Failed set MaxUnackedMessagesOnSubscription", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed set MaxUnackedMessagesOnSubscription", ex);
asyncResponse.resume(new RestException(ex));
} else {
asyncResponse.resume(Response.noContent().build());
}
});
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnSubscription")
@ApiOperation(value = "Delete max unacked messages per subscription config on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
public void deleteMaxUnackedMessagesOnSubscription(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
setMaxUnackedMessagesOnSubscription(asyncResponse, tenant, namespace, encodedTopic, null);
}
@GET
@Path("/{tenant}/{namespace}/{topic}/delayedDelivery")
@ApiOperation(value = "Get delayed delivery messages config on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error"),})
public void getDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
if (topicPolicies.isDelayedDeliveryEnabledSet() && topicPolicies.isDelayedDeliveryTickTimeMillisSet()) {
asyncResponse.resume(new DelayedDeliveryPolicies(topicPolicies.getDelayedDeliveryTickTimeMillis()
, topicPolicies.getDelayedDeliveryEnabled()));
} else {
asyncResponse.resume(Response.noContent().build());
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/delayedDelivery")
@ApiOperation(value = "Set delayed delivery messages config on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
public void setDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Delayed delivery policies for the specified topic") DelayedDeliveryPolicies deliveryPolicies) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalSetDelayedDeliveryPolicies(asyncResponse, deliveryPolicies);
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/delayedDelivery")
@ApiOperation(value = "Set delayed delivery messages config on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
public void deleteDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
setDelayedDeliveryPolicies(asyncResponse, tenant, namespace, encodedTopic, null);
}
/**
* It updates number of partitions of an existing non-global partitioned topic. It requires partitioned-topic to be
* already exist and number of new partitions must be greater than existing number of partitions. Decrementing
* number of partitions requires deletion of topic which is not supported.
*
* Already created partitioned producers and consumers can't see newly created partitions and it requires to
* recreate them at application so, newly created producers and consumers can connect to newly added partitions as
* well. Therefore, it can violate partition ordering at producers until all producers are restarted at application.
*
* @param tenant
* @param namespace
* @param encodedTopic
* @param numPartitions
*/
@POST
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Increment partitions of an existing partitioned topic.", notes = "It only increments partitions of existing non-global partitioned-topic")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to adminisActions to be grantedtrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
@ApiResponse(code = 406, message = "The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"),
@ApiResponse(code = 409, message = "Partitioned topic does not exist"),
@ApiResponse(code = 412, message = "Partitioned topic name is invalid"),
@ApiResponse(code = 500, message = "Internal server error")
})
public void updatePartitionedTopic(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean updateLocalTopicOnly,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "The number of partitions for the topic", required = true, type = "int", defaultValue = "0")
int numPartitions) {
validatePartitionedTopicName(tenant, namespace, encodedTopic);
validatePartitionedTopicMetadata(tenant, namespace, encodedTopic);
internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly, authoritative);
}
@POST
@Path("/{tenant}/{namespace}/{topic}/createMissedPartitions")
@ApiOperation(value = "Create missed partitions of an existing partitioned topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to adminisActions to be grantedtrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
@ApiResponse(code = 409, message = "Partitioned topic does not exist"),
@ApiResponse(code = 412, message = "Partitioned topic name is invalid"),
@ApiResponse(code = 500, message = "Internal server error")
})
public void createMissedPartitions(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic) {
try {
validatePartitionedTopicName(tenant, namespace, encodedTopic);
internalCreateMissedPartitions(asyncResponse);
} catch (Exception e) {
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}
@GET
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Get partitioned topic metadata.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "Partitioned topic name is invalid"),
@ApiResponse(code = 500, message = "Internal server error")
})
public PartitionedTopicMetadata getPartitionedMetadata(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Is check configuration required to automatically create topic")
@QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicExistedAndCheckAllowAutoCreation(tenant, namespace, encodedTopic, checkAllowAutoCreation);
return internalGetPartitionedMetadata(authoritative, checkAllowAutoCreation);
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Delete a partitioned topic.", notes = "It will also delete all the partitions of the topic if it exists.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Partitioned topic does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "Partitioned topic name is invalid"),
@ApiResponse(code = 500, message = "Internal server error")
})
public void deletePartitionedTopic(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Stop all producer/consumer/replicator and delete topic forcefully", defaultValue = "false", type = "boolean")
@QueryParam("force") @DefaultValue("false") boolean force,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Delete the topic's schema storage")
@QueryParam("deleteSchema") @DefaultValue("false") boolean deleteSchema) {
try {
validatePartitionedTopicName(tenant, namespace, encodedTopic);
internalDeletePartitionedTopic(asyncResponse, authoritative, force, deleteSchema);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@PUT
@Path("/{tenant}/{namespace}/{topic}/unload")
@ApiOperation(value = "Unload a topic")
@ApiResponses(value = {
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "Topic name is not valid or can't find owner for topic"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public void unloadTopic(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalUnloadTopic(asyncResponse, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}")
@ApiOperation(value = "Delete a topic.", notes = "The topic cannot be deleted if delete is not forcefully and there's any active "
+ "subscription or producer connected to the it. Force delete ignores connected clients and deletes topic by explicitly closing them.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 412, message = "Topic has active producers/subscriptions"),
@ApiResponse(code = 500, message = "Internal server error") })
public void deleteTopic(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Stop all producer/consumer/replicator and delete topic forcefully", defaultValue = "false", type = "boolean")
@QueryParam("force") @DefaultValue("false") boolean force,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Delete the topic's schema storage")
@QueryParam("deleteSchema") @DefaultValue("false") boolean deleteSchema) {
validateTopicName(tenant, namespace, encodedTopic);
internalDeleteTopic(authoritative, force, deleteSchema);
}
@GET
@Path("/{tenant}/{namespace}/{topic}/subscriptions")
@ApiOperation(value = "Get the list of persistent subscriptions for a given topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration"),
})
public void getSubscriptions(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalGetSubscriptions(asyncResponse, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@GET
@Path("{tenant}/{namespace}/{topic}/stats")
@ApiOperation(value = "Get the stats for the topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public TopicStats getStats(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "If return precise backlog or imprecise backlog")
@QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
@ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful "
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize) {
validateTopicName(tenant, namespace, encodedTopic);
return internalGetStats(authoritative, getPreciseBacklog, subscriptionBacklogSize);
}
@GET
@Path("{tenant}/{namespace}/{topic}/internalStats")
@ApiOperation(value = "Get the internal stats for the topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public PersistentTopicInternalStats getInternalStats(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("metadata") @DefaultValue("false") boolean metadata) {
validateTopicName(tenant, namespace, encodedTopic);
return internalGetInternalStats(authoritative, metadata);
}
@GET
@Path("{tenant}/{namespace}/{topic}/internal-info")
@ApiOperation(value = "Get the stored topic metadata.")
@ApiResponses(value = {
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public void getManagedLedgerInfo(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic")
@Encoded String encodedTopic,@Suspended AsyncResponse asyncResponse) {
validateTopicName(tenant, namespace, encodedTopic);
internalGetManagedLedgerInfo(asyncResponse, authoritative);
}
@GET
@Path("{tenant}/{namespace}/{topic}/partitioned-stats")
@ApiOperation(value = "Get the stats for the partitioned topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 412, message = "Partitioned topic name is invalid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
})
public void getPartitionedStats(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Get per partition stats")
@QueryParam("perPartition") @DefaultValue("true") boolean perPartition,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "If return precise backlog or imprecise backlog")
@QueryParam("getPreciseBacklog") @DefaultValue("false") boolean getPreciseBacklog,
@ApiParam(value = "If return backlog size for each subscription, require locking on ledger so be careful "
+ "not to use when there's heavy traffic.")
@QueryParam("subscriptionBacklogSize") @DefaultValue("false") boolean subscriptionBacklogSize) {
try {
validatePartitionedTopicName(tenant, namespace, encodedTopic);
internalGetPartitionedStats(asyncResponse, authoritative, perPartition, getPreciseBacklog,
subscriptionBacklogSize);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@GET
@Path("{tenant}/{namespace}/{topic}/partitioned-internalStats")
@ApiOperation(hidden = true, value = "Get the stats-internal for the partitioned topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public void getPartitionedStatsInternal(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalGetPartitionedStatsInternal(asyncResponse, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}")
@ApiOperation(value = "Delete a subscription.", notes = "The subscription cannot be deleted if delete is not forcefully and there are any active consumers attached to it. "
+ "Force delete ignores connected consumers and deletes subscription by explicitly closing them.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 412, message = "Subscription has active consumers"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public void deleteSubscription(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Subscription to be deleted")
@PathParam("subName") String encodedSubName,
@ApiParam(value = "Disconnect and close all consumers and delete subscription forcefully", defaultValue = "false", type = "boolean")
@QueryParam("force") @DefaultValue("false") boolean force,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalDeleteSubscription(asyncResponse, decode(encodedSubName), authoritative, force);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/skip_all")
@ApiOperation(value = "Skip all messages on a topic subscription.", notes = "Completely clears the backlog on the subscription.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" +
"subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic or subscription does not exist"),
@ApiResponse(code = 405, message = "Operation not allowed on non-persistent topic"),
@ApiResponse(code = 412, message = "Can't find owner for topic"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public void skipAllMessages(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Name of subscription")
@PathParam("subName") String encodedSubName,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalSkipAllMessages(asyncResponse, decode(encodedSubName), authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/skip/{numMessages}")
@ApiOperation(value = "Skipping messages on a topic subscription.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic or subscription does not exist"),
@ApiResponse(code = 405, message = "Skipping messages on a partitioned topic is not allowed"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
})
public void skipMessages(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Name of subscription")
@PathParam("subName") String encodedSubName,
@ApiParam(value = "The number of messages to skip", defaultValue = "0")
@PathParam("numMessages") int numMessages,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
internalSkipMessages(decode(encodedSubName), numMessages, authoritative);
}
@POST
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/expireMessages/{expireTimeInSeconds}")
@ApiOperation(value = "Expiry messages on a topic subscription.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" +
"subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic or subscription does not exist"),
@ApiResponse(code = 405, message = "Expiry messages on a non-persistent topic is not allowed"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public void expireTopicMessages(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Subscription to be Expiry messages on")
@PathParam("subName") String encodedSubName,
@ApiParam(value = "Expires beyond the specified number of seconds", defaultValue = "0")
@PathParam("expireTimeInSeconds") int expireTimeInSeconds,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalExpireMessagesByTimestamp(asyncResponse, decode(encodedSubName),
expireTimeInSeconds, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/expireMessages")
@ApiOperation(value = "Expiry messages on a topic subscription.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+ "subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic or subscription does not exist"),
@ApiResponse(code = 405, message = "Expiry messages on a non-persistent topic is not allowed"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
public void expireTopicMessages(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Subscription to be Expiry messages on")
@PathParam("subName") String encodedSubName,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(name = "messageId", value = "messageId to reset back to (ledgerId:entryId)")
ResetCursorData resetCursorData) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalExpireMessagesByPosition(asyncResponse, decode(encodedSubName), authoritative,
new MessageIdImpl(resetCursorData.getLedgerId(),
resetCursorData.getEntryId(), resetCursorData.getPartitionIndex())
, resetCursorData.isExcluded(), resetCursorData.getBatchIndex());
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/all_subscription/expireMessages/{expireTimeInSeconds}")
@ApiOperation(value = "Expiry messages on all subscriptions of topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" +
"subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic or subscription does not exist"),
@ApiResponse(code = 405, message = "Expiry messages on a non-persistent topic is not allowed"),
@ApiResponse(code = 412, message = "Can't find owner for topic"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public void expireMessagesForAllSubscriptions(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Expires beyond the specified number of seconds", defaultValue = "0")
@PathParam("expireTimeInSeconds") int expireTimeInSeconds,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalExpireMessagesForAllSubscriptions(asyncResponse, expireTimeInSeconds, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@PUT
@Path("/{tenant}/{namespace}/{topic}/subscription/{subscriptionName}")
@ApiOperation(value = "Create a subscription on the topic.", notes = "Creates a subscription on the topic at the specified message id")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 400, message = "Create subscription on non persistent topic is not supported"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" +
"subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
@ApiResponse(code = 405, message = "Not supported for partitioned topics"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public void createSubscription(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String topic,
@ApiParam(value = "Subscription to create position on", required = true)
@PathParam("subscriptionName") String encodedSubName,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(name = "messageId", value = "messageId where to create the subscription. " +
"It can be 'latest', 'earliest' or (ledgerId:entryId)",
defaultValue = "latest",
allowableValues = "latest,earliest,ledgerId:entryId"
)
MessageIdImpl messageId,
@ApiParam(value = "Is replicated required to perform this operation")
@QueryParam("replicated") boolean replicated
) {
try {
validateTopicName(tenant, namespace, topic);
if (!topicName.isPersistent()) {
throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic" +
"can only be done through client");
}
internalCreateSubscription(asyncResponse, decode(encodedSubName), messageId, authoritative, replicated);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/resetcursor/{timestamp}")
@ApiOperation(value = "Reset subscription to message position closest to absolute timestamp (in ms).", notes = "It fence cursor and disconnects all active consumers before reseting cursor.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" +
"subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
@ApiResponse(code = 405, message = "Method Not Allowed"),
@ApiResponse(code = 412, message = "Failed to reset cursor on subscription or " +
"Unable to find position for timestamp specified"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
})
public void resetCursor(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Subscription to reset position on", required = true)
@PathParam("subName") String encodedSubName,
@ApiParam(value = "time in minutes to reset back to (or minutes, hours,days,weeks eg:100m, 3h, 2d, 5w)")
@PathParam("timestamp") long timestamp,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalResetCursor(asyncResponse, decode(encodedSubName), timestamp, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/resetcursor")
@ApiOperation(value = "Reset subscription to message position closest to given position.", notes = "It fence cursor and disconnects all active consumers before reseting cursor.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" +
"subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
@ApiResponse(code = 405, message = "Not supported for partitioned topics"),
@ApiResponse(code = 412, message = "Unable to find position for position specified"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public void resetCursorOnPosition(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(name = "subName", value = "Subscription to reset position on", required = true)
@PathParam("subName") String encodedSubName,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(name = "messageId", value = "messageId to reset back to (ledgerId:entryId)")
ResetCursorData resetCursorData) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalResetCursorOnPosition(asyncResponse, decode(encodedSubName), authoritative
, new MessageIdImpl(resetCursorData.getLedgerId(), resetCursorData.getEntryId(), resetCursorData.getPartitionIndex())
, resetCursorData.isExcluded(), resetCursorData.getBatchIndex());
} catch (Exception e) {
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}
@GET
@Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/position/{messagePosition}")
@ApiOperation(value = "Peek nth message on a topic subscription.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" +
"subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic, subscription or the message position does not exist"),
@ApiResponse(code = 405, message = "Skipping messages on a non-persistent topic is not allowed"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public Response peekNthMessage(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(name = "subName", value = "Subscribed message expired", required = true)
@PathParam("subName") String encodedSubName,
@ApiParam(value = "The number of messages (default 1)", defaultValue = "1")
@PathParam("messagePosition") int messagePosition,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
return internalPeekNthMessage(decode(encodedSubName), messagePosition, authoritative);
}
@GET
@Path("/{tenant}/{namespace}/{topic}/examinemessage")
@ApiOperation(value = "Examine a specific message on a topic by position relative to the earliest or the latest message.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic, the message position does not exist"),
@ApiResponse(code = 405, message = "If given partitioned topic"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error")})
public Response examineMessage(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(name = "initialPosition", value = "Relative start position to examine message." +
"It can be 'latest' or 'earliest'",
defaultValue = "latest",
allowableValues = "latest, earliest"
)
@QueryParam("initialPosition") String initialPosition,
@ApiParam(value = "The position of messages (default 1)", defaultValue = "1")
@QueryParam("messagePosition") long messagePosition,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
return internalExamineMessage(initialPosition, messagePosition, authoritative);
}
@GET
@Path("/{tenant}/{namespace}/{topic}/ledger/{ledgerId}/entry/{entryId}")
@ApiOperation(value = "Get message by its messageId.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" +
"subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic, subscription or the message position does not exist"),
@ApiResponse(code = 405, message = "Skipping messages on a non-persistent topic is not allowed"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
public void getMessageById(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "The ledger id", required = true)
@PathParam("ledgerId") long ledgerId,
@ApiParam(value = "The entry id", required = true)
@PathParam("entryId") long entryId,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalGetMessageById(asyncResponse, ledgerId, entryId, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@GET
@Path("{tenant}/{namespace}/{topic}/backlog")
@ApiOperation(value = "Get estimated backlog for offline topic.")
@ApiResponses(value = {
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public PersistentOfflineTopicStats getBacklog(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
return internalGetBacklog(authoritative);
}
@GET
@Path("/{tenant}/{namespace}/{topic}/backlogQuotaMap")
@ApiOperation(value = "Get backlog quota map on a topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic policy does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry")})
public Map<BacklogQuota.BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
return getTopicPolicies(topicName)
.map(TopicPolicies::getBackLogQuotaMap)
.map(map -> {
HashMap<BacklogQuota.BacklogQuotaType, BacklogQuota> hashMap = Maps.newHashMap();
map.forEach((key,value) -> {
hashMap.put(BacklogQuota.BacklogQuotaType.valueOf(key),value);
});
return hashMap;
})
.orElse(Maps.newHashMap());
}
@POST
@Path("/{tenant}/{namespace}/{topic}/backlogQuota")
@ApiOperation(value = "Set a backlog quota for a topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 412, message = "Specified backlog quota exceeds retention quota. Increase retention quota and retry request") })
public void setBacklogQuota(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("backlogQuotaType") BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalSetBacklogQuota(asyncResponse, backlogQuotaType, backlogQuota);
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/backlogQuota")
@ApiOperation(value = "Remove a backlog quota policy from a topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public void removeBacklogQuota(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("backlogQuotaType") BacklogQuota.BacklogQuotaType backlogQuotaType) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalRemoveBacklogQuota(asyncResponse, backlogQuotaType);
}
@GET
@Path("/{tenant}/{namespace}/{topic}/messageTTL")
@ApiOperation(value = "Get message TTL in seconds for a topic")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, enable the topic level policy and retry")})
public int getMessageTTL(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
return getTopicPolicies(topicName)
.map(TopicPolicies::getMessageTTLInSeconds)
.orElse(0); //same as default ttl at namespace level
}
@POST
@Path("/{tenant}/{namespace}/{topic}/messageTTL")
@ApiOperation(value = "Set message TTL in seconds for a topic")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Not authenticate to perform the request or policy is read only"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, enable the topic level policy and retry"),
@ApiResponse(code = 412, message = "Invalid message TTL value") })
public void setMessageTTL(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "TTL in seconds for the specified namespace", required = true)
@QueryParam("messageTTL") int messageTTL) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalSetMessageTTL(asyncResponse, messageTTL);
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/messageTTL")
@ApiOperation(value = "Remove message TTL in seconds for a topic")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Not authenticate to perform the request or policy is read only"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, enable the topic level policy and retry"),
@ApiResponse(code = 412, message = "Invalid message TTL value") })
public void removeMessageTTL(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalSetMessageTTL(asyncResponse, null);
}
@GET
@Path("/{tenant}/{namespace}/{topic}/deduplicationEnabled")
@ApiOperation(value = "Get deduplication configuration of a topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry")})
public void getDeduplicationEnabled(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
if (topicPolicies.isDeduplicationSet()) {
asyncResponse.resume(topicPolicies.getDeduplicationEnabled());
} else {
asyncResponse.resume(Response.noContent().build());
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/deduplicationEnabled")
@ApiOperation(value = "Set deduplication enabled on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry")})
public void setDeduplicationEnabled(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "DeduplicationEnabled policies for the specified topic") Boolean enabled) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalSetDeduplicationEnabled(enabled).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed updated deduplication", ex);
asyncResponse.resume(ex);
}else if (ex != null) {
log.error("Failed updated deduplication", ex);
asyncResponse.resume(new RestException(ex));
} else {
asyncResponse.resume(Response.noContent().build());
}
});
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/deduplicationEnabled")
@ApiOperation(value = "Remove deduplication configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public void removeDeduplicationEnabled(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
setDeduplicationEnabled(asyncResponse, tenant, namespace, encodedTopic, null);
}
@GET
@Path("/{tenant}/{namespace}/{topic}/retention")
@ApiOperation(value = "Get retention configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public void getRetention(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
try {
internalGetRetention(asyncResponse);
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/retention")
@ApiOperation(value = "Set retention configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "Retention Quota must exceed backlog quota") })
public void setRetention(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Retention policies for the specified namespace") RetentionPolicies retention) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalSetRetention(retention).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed updated retention", ex);
asyncResponse.resume(ex);
}else if (ex != null) {
log.error("Failed updated retention", ex);
asyncResponse.resume(new RestException(ex));
} else {
try {
log.info("[{}] Successfully updated retention: namespace={}, topic={}, retention={}",
clientAppId(),
namespaceName,
topicName.getLocalName(),
jsonMapper().writeValueAsString(retention));
} catch (JsonProcessingException ignore) { }
asyncResponse.resume(Response.noContent().build());
}
});
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/retention")
@ApiOperation(value = "Remove retention configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "Retention Quota must exceed backlog quota") })
public void removeRetention(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalRemoveRetention().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed updated retention", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully remove retention: namespace={}, topic={}",
clientAppId(),
namespaceName,
topicName.getLocalName());
asyncResponse.resume(Response.noContent().build());
}
});
}
@GET
@Path("/{tenant}/{namespace}/{topic}/persistence")
@ApiOperation(value = "Get configuration of persistence policies for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getPersistence(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
try {
Optional<PersistencePolicies> persistencePolicies = internalGetPersistence();
if (!persistencePolicies.isPresent()) {
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(persistencePolicies.get());
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/persistence")
@ApiOperation(value = "Set configuration of persistence policies for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 400, message = "Invalid persistence policies")})
public void setPersistence(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Bookkeeper persistence policies for specified topic") PersistencePolicies persistencePolicies) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalSetPersistence(persistencePolicies).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed updated persistence policies", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed updated persistence policies", ex);
asyncResponse.resume(new RestException(ex));
} else {
try {
log.info("[{}] Successfully updated persistence policies: namespace={}, topic={}, persistencePolicies={}",
clientAppId(),
namespaceName,
topicName.getLocalName(),
jsonMapper().writeValueAsString(persistencePolicies));
} catch (JsonProcessingException ignore) {
}
asyncResponse.resume(Response.noContent().build());
}
});
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/persistence")
@ApiOperation(value = "Remove configuration of persistence policies for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removePersistence(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalRemovePersistence().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed updated retention", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully remove persistence policies: namespace={}, topic={}",
clientAppId(),
namespaceName,
topicName.getLocalName());
asyncResponse.resume(Response.noContent().build());
}
});
}
@GET
@Path("/{tenant}/{namespace}/{topic}/maxSubscriptionsPerTopic")
@ApiOperation(value = "Get maxSubscriptionsPerTopic config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405,
message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
try {
Optional<Integer> maxSubscriptionsPerTopic = internalGetMaxSubscriptionsPerTopic();
if (!maxSubscriptionsPerTopic.isPresent()) {
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(maxSubscriptionsPerTopic.get());
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/maxSubscriptionsPerTopic")
@ApiOperation(value = "Set maxSubscriptionsPerTopic config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405,
message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "Invalid value of maxSubscriptionsPerTopic")})
public void setMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "The max subscriptions of the topic") int maxSubscriptionsPerTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Updating maxSubscriptionsPerTopic failed", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Updating maxSubscriptionsPerTopic failed", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully updated maxSubscriptionsPerTopic: namespace={}, topic={}"
+ ", maxSubscriptions={}"
, clientAppId(), namespaceName, topicName.getLocalName(), maxSubscriptionsPerTopic);
asyncResponse.resume(Response.noContent().build());
}
});
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/maxSubscriptionsPerTopic")
@ApiOperation(value = "Remove maxSubscriptionsPerTopic config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405,
message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removeMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalSetMaxSubscriptionsPerTopic(null).whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove maxSubscriptionsPerTopic", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully remove maximum subscription limit: namespace={}, topic={}",
clientAppId(),
namespaceName,
topicName.getLocalName());
asyncResponse.resume(Response.noContent().build());
}
});
}
@GET
@Path("/{tenant}/{namespace}/{topic}/maxProducers")
@ApiOperation(value = "Get maxProducers config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getMaxProducers(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
try {
Optional<Integer> maxProducers = internalGetMaxProducers();
if (!maxProducers.isPresent()) {
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(maxProducers.get());
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/maxProducers")
@ApiOperation(value = "Set maxProducers config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "Invalid value of maxProducers")})
public void setMaxProducers(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "The max producers of the topic") int maxProducers) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalSetMaxProducers(maxProducers).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed updated persistence policies", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed updated persistence policies", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully updated max producers: namespace={}, topic={}, maxProducers={}",
clientAppId(),
namespaceName,
topicName.getLocalName(),
maxProducers);
asyncResponse.resume(Response.noContent().build());
}
});
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/maxProducers")
@ApiOperation(value = "Remove maxProducers config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removeMaxProducers(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalRemoveMaxProducers().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove maxProducers", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully remove max producers: namespace={}, topic={}",
clientAppId(),
namespaceName,
topicName.getLocalName());
asyncResponse.resume(Response.noContent().build());
}
});
}
@GET
@Path("/{tenant}/{namespace}/{topic}/maxConsumers")
@ApiOperation(value = "Get maxConsumers config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getMaxConsumers(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
try {
Optional<Integer> maxConsumers = internalGetMaxConsumers();
if (!maxConsumers.isPresent()) {
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(maxConsumers.get());
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/maxConsumers")
@ApiOperation(value = "Set maxConsumers config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "Invalid value of maxConsumers")})
public void setMaxConsumers(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "The max consumers of the topic") int maxConsumers) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalSetMaxConsumers(maxConsumers).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed updated persistence policies", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed updated persistence policies", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully updated max consumers: namespace={}, topic={}, maxConsumers={}",
clientAppId(),
namespaceName,
topicName.getLocalName(),
maxConsumers);
asyncResponse.resume(Response.noContent().build());
}
});
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/maxConsumers")
@ApiOperation(value = "Remove maxConsumers config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removeMaxConsumers(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalRemoveMaxConsumers().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove maxConsumers", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully remove max consumers: namespace={}, topic={}",
clientAppId(),
namespaceName,
topicName.getLocalName());
asyncResponse.resume(Response.noContent().build());
}
});
}
@GET
@Path("/{tenant}/{namespace}/{topic}/maxMessageSize")
@ApiOperation(value = "Get maxMessageSize config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getMaxMessageSize(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
try {
Optional<Integer> policies = internalGetMaxMessageSize();
if (policies.isPresent()) {
asyncResponse.resume(policies.get());
} else {
asyncResponse.resume(Response.noContent().build());
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/maxMessageSize")
@ApiOperation(value = "Set maxMessageSize config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "Invalid value of maxConsumers")})
public void setMaxMessageSize(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "The max message size of the topic") int maxMessageSize) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalSetMaxMessageSize(maxMessageSize).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed updated persistence policies", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed updated persistence policies", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully set max message size: namespace={}, topic={}, maxMessageSiz={}",
clientAppId(),
namespaceName,
topicName.getLocalName(),
maxMessageSize);
asyncResponse.resume(Response.noContent().build());
}
});
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/maxMessageSize")
@ApiOperation(value = "Remove maxMessageSize config for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removeMaxMessageSize(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalSetMaxMessageSize(null).whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove maxMessageSize", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully remove max message size: namespace={}, topic={}",
clientAppId(),
namespaceName,
topicName.getLocalName());
asyncResponse.resume(Response.noContent().build());
}
});
}
@POST
@Path("/{tenant}/{namespace}/{topic}/terminate")
@ApiOperation(value = "Terminate a topic. A topic that is terminated will not accept any more "
+ "messages to be published and will let consumer to drain existing messages in backlog")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" +
"subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Termination of a partitioned topic is not allowed"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public MessageId terminate(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
return internalTerminate(authoritative);
}
@POST
@Path("/{tenant}/{namespace}/{topic}/terminate/partitions")
@ApiOperation(value = "Terminate all partitioned topic. A topic that is terminated will not accept any more "
+ "messages to be published and will let consumer to drain existing messages in backlog")
@ApiResponses(value = {
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" +
"subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Termination of a partitioned topic is not allowed"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public void terminatePartitionedTopic( @Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
internalTerminatePartitionedTopic(asyncResponse, authoritative);
}
@PUT
@Path("/{tenant}/{namespace}/{topic}/compaction")
@ApiOperation(value = "Trigger a compaction operation on a topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" +
"subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Operation is not allowed on the persistent topic"),
@ApiResponse(code = 409, message = "Compaction already running"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public void compact(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalTriggerCompaction(asyncResponse, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@GET
@Path("/{tenant}/{namespace}/{topic}/compaction")
@ApiOperation(value = "Get the status of a compaction operation for a topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" +
"subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist, or compaction hasn't run"),
@ApiResponse(code = 405, message = "Operation is not allowed on the persistent topic"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public LongRunningProcessStatus compactionStatus(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
return internalCompactionStatus(authoritative);
}
@PUT
@Path("/{tenant}/{namespace}/{topic}/offload")
@ApiOperation(value = "Offload a prefix of a topic to long term storage")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 400, message = "Message ID is null"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" +
"subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Operation is not allowed on the persistent topic"),
@ApiResponse(code = 409, message = "Offload already running"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public void triggerOffload(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
MessageIdImpl messageId) {
if (messageId == null) {
throw new RestException(Response.Status.BAD_REQUEST, "messageId is null");
}
validateTopicName(tenant, namespace, encodedTopic);
internalTriggerOffload(authoritative, messageId);
}
@GET
@Path("/{tenant}/{namespace}/{topic}/offload")
@ApiOperation(value = "Offload a prefix of a topic to long term storage")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" +
"subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Operation is not allowed on the persistent topic"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public OffloadProcessStatus offloadStatus(
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
return internalOffloadStatus(authoritative);
}
@GET
@Path("/{tenant}/{namespace}/{topic}/lastMessageId")
@ApiOperation(value = "Return the last commit message id of topic")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" +
"subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Operation is not allowed on the persistent topic"),
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
public void getLastMessageId(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalGetLastMessageId(asyncResponse, authoritative);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@GET
@Path("/{tenant}/{namespace}/{topic}/dispatchRate")
@ApiOperation(value = "Get dispatch rate configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getDispatchRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
try {
Optional<DispatchRate> dispatchRate = internalGetDispatchRate();
if (!dispatchRate.isPresent()) {
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(dispatchRate.get());
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/dispatchRate")
@ApiOperation(value = "Set message dispatch rate configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void setDispatchRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Dispatch rate for the specified topic") DispatchRate dispatchRate) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalSetDispatchRate(dispatchRate).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed to set topic dispatch rate", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed to set topic dispatch rate");
asyncResponse.resume(new RestException(ex));
} else {
try {
log.info("[{}] Successfully set topic dispatch rate: tenant={}, namespace={}, topic={}, dispatchRate={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName(),
jsonMapper().writeValueAsString(dispatchRate));
} catch (JsonProcessingException ignore) {}
asyncResponse.resume(Response.noContent().build());
}
});
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/dispatchRate")
@ApiOperation(value = "Remove message dispatch rate configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removeDispatchRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalRemoveDispatchRate().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove topic dispatch rate", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully remove topic dispatch rate: tenant={}, namespace={}, topic={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName());
asyncResponse.resume(Response.noContent().build());
}
});
}
@GET
@Path("/{tenant}/{namespace}/{topic}/subscriptionDispatchRate")
@ApiOperation(value = "Get subscription message dispatch rate configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getSubscriptionDispatchRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
try {
Optional<DispatchRate> dispatchRate = internalGetSubscriptionDispatchRate();
if (!dispatchRate.isPresent()) {
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(dispatchRate.get());
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/subscriptionDispatchRate")
@ApiOperation(value = "Set subscription message dispatch rate configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void setSubscriptionDispatchRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Subscription message dispatch rate for the specified topic") DispatchRate dispatchRate) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalSetSubscriptionDispatchRate(dispatchRate).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed to set topic: {} subscription dispatch rate", topicName.getLocalName(), ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed to set topic: {} subscription dispatch rate", topicName.getLocalName());
asyncResponse.resume(new RestException(ex));
} else {
try {
log.info("[{}] Successfully set topic subscription dispatch rate: tenant={}, namespace={}, topic={}, dispatchRate={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName(),
jsonMapper().writeValueAsString(dispatchRate));
} catch (JsonProcessingException ignore) {}
asyncResponse.resume(Response.noContent().build());
}
});
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/subscriptionDispatchRate")
@ApiOperation(value = "Remove subscription message dispatch rate configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removeSubscriptionDispatchRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalRemoveSubscriptionDispatchRate().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove topic: {} subscription dispatch rate", topicName.getLocalName(), ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully remove topic subscription dispatch rate: tenant={}, namespace={}, topic={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName());
asyncResponse.resume(Response.noContent().build());
}
});
}
@GET
@Path("/{tenant}/{namespace}/{topic}/compactionThreshold")
@ApiOperation(value = "Get compaction threshold configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getCompactionThreshold(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
try {
Optional<Long> compactionThreshold = internalGetCompactionThreshold();
if (!compactionThreshold.isPresent()) {
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(compactionThreshold.get());
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/compactionThreshold")
@ApiOperation(value = "Set compaction threshold configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void setCompactionThreshold(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Dispatch rate for the specified topic") long compactionThreshold) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalSetCompactionThreshold(compactionThreshold).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed to set topic dispatch rate", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed to set topic dispatch rate");
asyncResponse.resume(new RestException(ex));
} else {
try {
log.info("[{}] Successfully set topic compaction threshold: tenant={}, namespace={}, topic={}, compactionThreshold={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName(),
jsonMapper().writeValueAsString(compactionThreshold));
} catch (JsonProcessingException ignore) {}
asyncResponse.resume(Response.noContent().build());
}
});
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/compactionThreshold")
@ApiOperation(value = "Remove compaction threshold configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removeCompactionThreshold(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalRemoveCompactionThreshold().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove topic dispatch rate", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully remove topic compaction threshold: tenant={}, namespace={}, topic={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName());
asyncResponse.resume(Response.noContent().build());
}
});
}
@GET
@Path("/{tenant}/{namespace}/{topic}/maxConsumersPerSubscription")
@ApiOperation(value = "Get max consumers per subscription configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getMaxConsumersPerSubscription(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
try {
Optional<Integer> maxConsumersPerSubscription = internalGetMaxConsumersPerSubscription();
if (!maxConsumersPerSubscription.isPresent()) {
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(maxConsumersPerSubscription.get());
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/maxConsumersPerSubscription")
@ApiOperation(value = "Set max consumers per subscription configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void setMaxConsumersPerSubscription(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Dispatch rate for the specified topic") int maxConsumersPerSubscription) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed to set topic {} max consumers per subscription ", topicName.getLocalName(), ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed to set topic max consumers per subscription");
asyncResponse.resume(new RestException(ex));
} else {
try {
log.info("[{}] Successfully set topic max consumers per subscription: tenant={}, namespace={}, topic={}, maxConsumersPerSubscription={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName(),
jsonMapper().writeValueAsString(maxConsumersPerSubscription));
} catch (JsonProcessingException ignore) {}
asyncResponse.resume(Response.noContent().build());
}
});
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/maxConsumersPerSubscription")
@ApiOperation(value = "Remove max consumers per subscription configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removeMaxConsumersPerSubscription(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalRemoveMaxConsumersPerSubscription().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove topic {} max consuners per subscription", topicName.getLocalName(), ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully remove topic max consumers per subscription: tenant={}, namespace={}, topic={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName());
asyncResponse.resume(Response.noContent().build());
}
});
}
@GET
@Path("/{tenant}/{namespace}/{topic}/publishRate")
@ApiOperation(value = "Get publish rate configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getPublishRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
try {
Optional<PublishRate> publishRate = internalGetPublishRate();
if (!publishRate.isPresent()) {
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(publishRate.get());
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/publishRate")
@ApiOperation(value = "Set message publish rate configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void setPublishRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Dispatch rate for the specified topic") PublishRate publishRate) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalSetPublishRate(publishRate).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed to set topic dispatch rate", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed to set topic dispatch rate");
asyncResponse.resume(new RestException(ex));
} else {
try {
log.info("[{}] Successfully set topic publish rate: tenant={}, namespace={}, topic={}, publishRate={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName(),
jsonMapper().writeValueAsString(publishRate));
} catch (JsonProcessingException ignore) {}
asyncResponse.resume(Response.noContent().build());
}
});
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/publishRate")
@ApiOperation(value = "Remove message publish rate configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removePublishRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalRemovePublishRate().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove topic publish rate", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully remove topic publish rate: tenant={}, namespace={}, topic={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName());
asyncResponse.resume(Response.noContent().build());
}
});
}
@GET
@Path("/{tenant}/{namespace}/{topic}/subscribeRate")
@ApiOperation(value = "Get subscribe rate configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getSubscribeRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
try {
Optional<SubscribeRate> subscribeRate = internalGetSubscribeRate();
if (!subscribeRate.isPresent()) {
asyncResponse.resume(Response.noContent().build());
} else {
asyncResponse.resume(subscribeRate.get());
}
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
@POST
@Path("/{tenant}/{namespace}/{topic}/subscribeRate")
@ApiOperation(value = "Set subscribe rate configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void setSubscribeRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Subscribe rate for the specified topic") SubscribeRate subscribeRate) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalSetSubscribeRate(subscribeRate).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed to set topic {} subscribe rate", topicName.getLocalName(), ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed to set topic subscribe rate");
asyncResponse.resume(new RestException(ex));
} else {
try {
log.info("[{}] Successfully set topic subscribe rate: tenant={}, namespace={}, topic={}, subscribeRate={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName(),
jsonMapper().writeValueAsString(subscribeRate));
} catch (JsonProcessingException ignore) {}
asyncResponse.resume(Response.noContent().build());
}
});
}
@DELETE
@Path("/{tenant}/{namespace}/{topic}/subscribeRate")
@ApiOperation(value = "Remove subscribe rate configuration for specified topic.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Topic does not exist"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, please enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removeSubscribeRate(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation();
internalRemoveSubscribeRate().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed to remove topic {} subscribe rate ", topicName.getLocalName(), ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully remove topic subscribe rate: tenant={}, namespace={}, topic={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName());
asyncResponse.resume(Response.noContent().build());
}
});
}
private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
}