blob: 7ad8f04b507fa639cf0cb16851f656a280ddb19f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.admin.internal;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TopicsImpl extends BaseResource implements Topics {
private final WebTarget adminTopics;
private final WebTarget adminV2Topics;
// CHECKSTYLE.OFF: MemberName
static private final String BATCH_HEADER = "X-Pulsar-num-batch-message";
static private final String MESSAGE_ID = "X-Pulsar-Message-ID";
static private final String PUBLISH_TIME = "X-Pulsar-publish-time";
// CHECKSTYLE.ON: MemberName
public TopicsImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
super(auth, readTimeoutMs);
adminTopics = web.path("/admin");
adminV2Topics = web.path("/admin/v2");
}
@Override
public List<String> getList(String namespace) throws PulsarAdminException {
try {
return getListAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<List<String>> getListAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget persistentPath = namespacePath("persistent", ns);
WebTarget nonPersistentPath = namespacePath("non-persistent", ns);
final CompletableFuture<List<String>> persistentList = new CompletableFuture<>();
final CompletableFuture<List<String>> nonPersistentList = new CompletableFuture<>();
asyncGetRequest(persistentPath,
new InvocationCallback<List<String>>() {
@Override
public void completed(List<String> topics) {
persistentList.complete(topics);
}
@Override
public void failed(Throwable throwable) {
persistentList.completeExceptionally(getApiException(throwable.getCause()));
}
});
asyncGetRequest(nonPersistentPath,
new InvocationCallback<List<String>>() {
@Override
public void completed(List<String> a) {
nonPersistentList.complete(a);
}
@Override
public void failed(Throwable throwable) {
nonPersistentList.completeExceptionally(getApiException(throwable.getCause()));
}
});
return persistentList.thenCombine(nonPersistentList,
(l1, l2) -> new ArrayList<>(Stream.concat(l1.stream(), l2.stream()).collect(Collectors.toSet())));
}
@Override
public List<String> getPartitionedTopicList(String namespace) throws PulsarAdminException {
try {
return getPartitionedTopicListAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<List<String>> getPartitionedTopicListAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget persistentPath = namespacePath("persistent", ns, "partitioned");
WebTarget nonPersistentPath = namespacePath("non-persistent", ns, "partitioned");
final CompletableFuture<List<String>> persistentList = new CompletableFuture<>();
final CompletableFuture<List<String>> nonPersistentList = new CompletableFuture<>();
asyncGetRequest(persistentPath,
new InvocationCallback<List<String>>() {
@Override
public void completed(List<String> topics) {
persistentList.complete(topics);
}
@Override
public void failed(Throwable throwable) {
persistentList.completeExceptionally(getApiException(throwable.getCause()));
}
});
asyncGetRequest(nonPersistentPath,
new InvocationCallback<List<String>>() {
@Override
public void completed(List<String> topics) {
nonPersistentList.complete(topics);
}
@Override
public void failed(Throwable throwable) {
nonPersistentList.completeExceptionally(getApiException(throwable.getCause()));
}
});
return persistentList.thenCombine(nonPersistentList,
(l1, l2) -> new ArrayList<>(Stream.concat(l1.stream(), l2.stream()).collect(Collectors.toSet())));
}
@Override
public List<String> getListInBundle(String namespace, String bundleRange) throws PulsarAdminException {
try {
return getListInBundleAsync(namespace, bundleRange).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<List<String>> getListInBundleAsync(String namespace, String bundleRange) {
NamespaceName ns = NamespaceName.get(namespace);
final CompletableFuture<List<String>> future = new CompletableFuture<>();
WebTarget path = namespacePath("non-persistent", ns, bundleRange);
asyncGetRequest(path,
new InvocationCallback<List<String>>() {
@Override
public void completed(List<String> response) {
future.complete(response);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public Map<String, Set<AuthAction>> getPermissions(String topic) throws PulsarAdminException {
try {
return getPermissionsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Map<String, Set<AuthAction>>> getPermissionsAsync(String topic) {
TopicName tn = TopicName.get(topic);
WebTarget path = topicPath(tn, "permissions");
final CompletableFuture<Map<String, Set<AuthAction>>> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Map<String, Set<AuthAction>>>() {
@Override
public void completed(Map<String, Set<AuthAction>> permissions) {
future.complete(permissions);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public void grantPermission(String topic, String role, Set<AuthAction> actions) throws PulsarAdminException {
try {
grantPermissionAsync(topic, role, actions).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> grantPermissionAsync(String topic, String role, Set<AuthAction> actions) {
TopicName tn = TopicName.get(topic);
WebTarget path = topicPath(tn, "permissions", role);
return asyncPostRequest(path, Entity.entity(actions, MediaType.APPLICATION_JSON));
}
@Override
public void revokePermissions(String topic, String role) throws PulsarAdminException {
try {
revokePermissionsAsync(topic, role).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> revokePermissionsAsync(String topic, String role) {
TopicName tn = TopicName.get(topic);
WebTarget path = topicPath(tn, "permissions", role);
return asyncDeleteRequest(path);
}
@Override
public void createPartitionedTopic(String topic, int numPartitions) throws PulsarAdminException {
try {
createPartitionedTopicAsync(topic, numPartitions).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public void createNonPartitionedTopic(String topic) throws PulsarAdminException {
try {
createNonPartitionedTopicAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public void createMissedPartitions(String topic) throws PulsarAdminException {
try {
createMissedPartitionsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> createNonPartitionedTopicAsync(String topic){
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn);
return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}
@Override
public CompletableFuture<Void> createPartitionedTopicAsync(String topic, int numPartitions) {
checkArgument(numPartitions > 0, "Number of partitions should be more than 0");
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "partitions");
return asyncPutRequest(path, Entity.entity(numPartitions, MediaType.APPLICATION_JSON));
}
@Override
public CompletableFuture<Void> createMissedPartitionsAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "createMissedPartitions");
return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}
@Override
public void updatePartitionedTopic(String topic, int numPartitions)
throws PulsarAdminException {
try {
updatePartitionedTopicAsync(topic, numPartitions).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> updatePartitionedTopicAsync(String topic, int numPartitions) {
return updatePartitionedTopicAsync(topic, numPartitions, false);
}
@Override
public void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocalTopicOnly)
throws PulsarAdminException {
try {
updatePartitionedTopicAsync(topic, numPartitions, updateLocalTopicOnly)
.get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> updatePartitionedTopicAsync(String topic, int numPartitions,
boolean updateLocalTopicOnly) {
checkArgument(numPartitions > 0, "Number of partitions must be more than 0");
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "partitions");
path = path.queryParam("updateLocalTopicOnly", Boolean.toString(updateLocalTopicOnly));
return asyncPostRequest(path, Entity.entity(numPartitions, MediaType.APPLICATION_JSON));
}
@Override
public PartitionedTopicMetadata getPartitionedTopicMetadata(String topic) throws PulsarAdminException {
try {
return getPartitionedTopicMetadataAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadataAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "partitions");
final CompletableFuture<PartitionedTopicMetadata> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<PartitionedTopicMetadata>() {
@Override
public void completed(PartitionedTopicMetadata response) {
future.complete(response);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public void deletePartitionedTopic(String topic) throws PulsarAdminException {
deletePartitionedTopic(topic, false);
}
@Override
public CompletableFuture<Void> deletePartitionedTopicAsync(String topic) {
return deletePartitionedTopicAsync(topic, false);
}
@Override
public void deletePartitionedTopic(String topic, boolean force, boolean deleteSchema) throws PulsarAdminException {
try {
deletePartitionedTopicAsync(topic, force, deleteSchema).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> deletePartitionedTopicAsync(String topic, boolean force, boolean deleteSchema) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "partitions") //
.queryParam("force", Boolean.toString(force)) //
.queryParam("deleteSchema", Boolean.toString(deleteSchema));
return asyncDeleteRequest(path);
}
@Override
public void delete(String topic) throws PulsarAdminException {
delete(topic, false);
}
@Override
public CompletableFuture<Void> deleteAsync(String topic) {
return deleteAsync(topic, false);
}
@Override
public void delete(String topic, boolean force, boolean deleteSchema) throws PulsarAdminException {
try {
deleteAsync(topic, force, deleteSchema).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> deleteAsync(String topic, boolean force, boolean deleteSchema) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn) //
.queryParam("force", Boolean.toString(force)) //
.queryParam("deleteSchema", Boolean.toString(deleteSchema));
return asyncDeleteRequest(path);
}
@Override
public void unload(String topic) throws PulsarAdminException {
try {
unloadAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> unloadAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "unload");
return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}
@Override
public MessageId terminateTopic(String topic) throws PulsarAdminException {
try {
return terminateTopicAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<MessageId> terminateTopicAsync(String topic) {
TopicName tn = validateTopic(topic);
final CompletableFuture<MessageId> future = new CompletableFuture<>();
try {
final WebTarget path = topicPath(tn, "terminate");
request(path).async().post(Entity.entity("", MediaType.APPLICATION_JSON),
new InvocationCallback<MessageIdImpl>() {
@Override
public void completed(MessageIdImpl messageId) {
future.complete(messageId);
}
@Override
public void failed(Throwable throwable) {
log.warn("[{}] Failed to perform http post request: {}", path.getUri(),
throwable.getMessage());
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
} catch (PulsarAdminException cae) {
future.completeExceptionally(cae);
}
return future;
}
@Override
public List<String> getSubscriptions(String topic) throws PulsarAdminException {
try {
return getSubscriptionsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<List<String>> getSubscriptionsAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "subscriptions");
final CompletableFuture<List<String>> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<List<String>>() {
@Override
public void completed(List<String> response) {
future.complete(response);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public TopicStats getStats(String topic, boolean getPreciseBacklog,
boolean subscriptionBacklogSize) throws PulsarAdminException {
try {
return getStatsAsync(topic, getPreciseBacklog, subscriptionBacklogSize)
.get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<TopicStats> getStatsAsync(String topic, boolean getPreciseBacklog,
boolean subscriptionBacklogSize) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "stats")
.queryParam("getPreciseBacklog", getPreciseBacklog)
.queryParam("subscriptionBacklogSize", subscriptionBacklogSize);
final CompletableFuture<TopicStats> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<TopicStats>() {
@Override
public void completed(TopicStats response) {
future.complete(response);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public PersistentTopicInternalStats getInternalStats(String topic) throws PulsarAdminException {
return getInternalStats(topic, false);
}
@Override
public PersistentTopicInternalStats getInternalStats(String topic, boolean metadata) throws PulsarAdminException {
try {
return getInternalStatsAsync(topic, metadata).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<PersistentTopicInternalStats> getInternalStatsAsync(String topic) {
return getInternalStatsAsync(topic, false);
}
@Override
public CompletableFuture<PersistentTopicInternalStats> getInternalStatsAsync(String topic, boolean metadata) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "internalStats");
path = path.queryParam("metadata", metadata);
final CompletableFuture<PersistentTopicInternalStats> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<PersistentTopicInternalStats>() {
@Override
public void completed(PersistentTopicInternalStats response) {
future.complete(response);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public JsonObject getInternalInfo(String topic) throws PulsarAdminException {
try {
return getInternalInfoAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<JsonObject> getInternalInfoAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "internal-info");
final CompletableFuture<JsonObject> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<String>() {
@Override
public void completed(String response) {
JsonObject json = new Gson().fromJson(response, JsonObject.class);
future.complete(json);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public PartitionedTopicStats getPartitionedStats(String topic, boolean perPartition, boolean getPreciseBacklog,
boolean subscriptionBacklogSize)
throws PulsarAdminException {
try {
return getPartitionedStatsAsync(topic, perPartition, getPreciseBacklog, subscriptionBacklogSize)
.get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<PartitionedTopicStats> getPartitionedStatsAsync(String topic,
boolean perPartition, boolean getPreciseBacklog, boolean subscriptionBacklogSize) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "partitioned-stats");
path = path.queryParam("perPartition", perPartition)
.queryParam("getPreciseBacklog", getPreciseBacklog)
.queryParam("subscriptionBacklogSize", subscriptionBacklogSize);
final CompletableFuture<PartitionedTopicStats> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<PartitionedTopicStats>() {
@Override
public void completed(PartitionedTopicStats response) {
if (!perPartition) {
response.partitions.clear();
}
future.complete(response);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public PartitionedTopicInternalStats getPartitionedInternalStats(String topic)
throws PulsarAdminException {
try {
return getPartitionedInternalStatsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<PartitionedTopicInternalStats> getPartitionedInternalStatsAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "partitioned-internalStats");
final CompletableFuture<PartitionedTopicInternalStats> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<PartitionedTopicInternalStats>() {
@Override
public void completed(PartitionedTopicInternalStats response) {
future.complete(response);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public void deleteSubscription(String topic, String subName) throws PulsarAdminException {
try {
deleteSubscriptionAsync(topic, subName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public void deleteSubscription(String topic, String subName, boolean force) throws PulsarAdminException {
try {
deleteSubscriptionAsync(topic, subName, force).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> deleteSubscriptionAsync(String topic, String subName) {
return deleteSubscriptionAsync(topic, subName, false);
}
@Override
public CompletableFuture<Void> deleteSubscriptionAsync(String topic, String subName, boolean force) {
TopicName tn = validateTopic(topic);
String encodedSubName = Codec.encode(subName);
WebTarget path = topicPath(tn, "subscription", encodedSubName);
path = path.queryParam("force", force);
return asyncDeleteRequest(path);
}
@Override
public void skipAllMessages(String topic, String subName) throws PulsarAdminException {
try {
skipAllMessagesAsync(topic, subName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> skipAllMessagesAsync(String topic, String subName) {
TopicName tn = validateTopic(topic);
String encodedSubName = Codec.encode(subName);
WebTarget path = topicPath(tn, "subscription", encodedSubName, "skip_all");
return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}
@Override
public void skipMessages(String topic, String subName, long numMessages) throws PulsarAdminException {
try {
skipMessagesAsync(topic, subName, numMessages).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> skipMessagesAsync(String topic, String subName, long numMessages) {
TopicName tn = validateTopic(topic);
String encodedSubName = Codec.encode(subName);
WebTarget path = topicPath(tn, "subscription", encodedSubName, "skip", String.valueOf(numMessages));
return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}
@Override
public void expireMessages(String topic, String subName, long expireTimeInSeconds) throws PulsarAdminException {
try {
expireMessagesAsync(topic, subName, expireTimeInSeconds).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> expireMessagesAsync(String topic, String subName, long expireTimeInSeconds) {
TopicName tn = validateTopic(topic);
String encodedSubName = Codec.encode(subName);
WebTarget path = topicPath(tn, "subscription", encodedSubName,
"expireMessages", String.valueOf(expireTimeInSeconds));
return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}
@Override
public void expireMessages(String topic, String subscriptionName, MessageId messageId, boolean isExcluded)
throws PulsarAdminException {
try {
expireMessagesAsync(topic, subscriptionName, messageId, isExcluded)
.get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> expireMessagesAsync(String topic, String subscriptionName,
MessageId messageId, boolean isExcluded) {
TopicName tn = validateTopic(topic);
String encodedSubName = Codec.encode(subscriptionName);
ResetCursorData resetCursorData = new ResetCursorData(messageId);
resetCursorData.setExcluded(isExcluded);
WebTarget path = topicPath(tn, "subscription", encodedSubName, "expireMessages");
return asyncPostRequest(path, Entity.entity(resetCursorData, MediaType.APPLICATION_JSON));
}
@Override
public void expireMessagesForAllSubscriptions(String topic, long expireTimeInSeconds) throws PulsarAdminException {
try {
expireMessagesForAllSubscriptionsAsync(topic, expireTimeInSeconds)
.get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> expireMessagesForAllSubscriptionsAsync(String topic, long expireTimeInSeconds) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "all_subscription",
"expireMessages", String.valueOf(expireTimeInSeconds));
return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}
private CompletableFuture<List<Message<byte[]>>> peekNthMessage(String topic, String subName, int messagePosition) {
TopicName tn = validateTopic(topic);
String encodedSubName = Codec.encode(subName);
WebTarget path = topicPath(tn, "subscription", encodedSubName,
"position", String.valueOf(messagePosition));
final CompletableFuture<List<Message<byte[]>>> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
try {
future.complete(getMessagesFromHttpResponse(tn.toString(), response));
} catch (Exception e) {
future.completeExceptionally(getApiException(e));
}
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public List<Message<byte[]>> peekMessages(String topic, String subName, int numMessages)
throws PulsarAdminException {
try {
return peekMessagesAsync(topic, subName, numMessages).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<List<Message<byte[]>>> peekMessagesAsync(String topic, String subName, int numMessages) {
checkArgument(numMessages > 0);
CompletableFuture<List<Message<byte[]>>> future = new CompletableFuture<List<Message<byte[]>>>();
peekMessagesAsync(topic, subName, numMessages, Lists.newArrayList(), future, 1);
return future;
}
private void peekMessagesAsync(String topic, String subName, int numMessages,
List<Message<byte[]>> messages, CompletableFuture<List<Message<byte[]>>> future, int nthMessage) {
if (numMessages <= 0) {
future.complete(messages);
return;
}
// if peeking first message succeeds, we know that the topic and subscription exists
peekNthMessage(topic, subName, nthMessage).handle((r, ex) -> {
if (ex != null) {
// if we get a not found exception, it means that the position for the message we are trying to get
// does not exist. At this point, we can return the already found messages.
if (ex instanceof NotFoundException) {
log.warn("Exception '{}' occurred while trying to peek Messages.", ex.getMessage());
future.complete(messages);
} else {
future.completeExceptionally(ex);
}
return null;
}
for (int i = 0; i < Math.min(r.size(), numMessages); i++) {
messages.add(r.get(i));
}
peekMessagesAsync(topic, subName, numMessages - r.size(), messages, future, nthMessage + 1);
return null;
});
}
@Override
public Message<byte[]> examineMessage(String topic, String initialPosition, long messagePosition)
throws PulsarAdminException {
try {
return examineMessageAsync(topic, initialPosition, messagePosition)
.get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Message<byte[]>> examineMessageAsync(String topic, String initialPosition,
long messagePosition) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "examinemessage")
.queryParam("initialPosition", initialPosition)
.queryParam("messagePosition", messagePosition);
final CompletableFuture<Message<byte[]>> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
try {
List<Message<byte[]>> messages = getMessagesFromHttpResponse(tn.toString(), response);
if (messages.size() > 0) {
future.complete(messages.get(0));
} else {
future.complete(null);
}
} catch (Exception e) {
future.completeExceptionally(getApiException(e));
}
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public CompletableFuture<Message<byte[]>> getMessageByIdAsync(String topic, long ledgerId, long entryId) {
CompletableFuture<Message<byte[]>> future = new CompletableFuture<>();
getRemoteMessageById(topic, ledgerId, entryId).handle((r, ex) -> {
if (ex != null) {
if (ex instanceof NotFoundException) {
log.warn("Exception '{}' occurred while trying to get message.", ex.getMessage());
future.complete(r);
} else {
future.completeExceptionally(ex);
}
return null;
}
future.complete(r);
return null;
});
return future;
}
private CompletableFuture<Message<byte[]>> getRemoteMessageById(String topic, long ledgerId, long entryId) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "ledger", Long.toString(ledgerId), "entry", Long.toString(entryId));
final CompletableFuture<Message<byte[]>> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Response>() {
@Override
public void completed(Response response) {
try {
future.complete(getMessagesFromHttpResponse(topicName.toString(), response).get(0));
} catch (Exception e) {
future.completeExceptionally(getApiException(e));
}
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public Message<byte[]> getMessageById(String topic, long ledgerId, long entryId)
throws PulsarAdminException {
try {
return getMessageByIdAsync(topic, ledgerId, entryId).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public void createSubscription(String topic, String subscriptionName, MessageId messageId)
throws PulsarAdminException {
try {
TopicName tn = validateTopic(topic);
String encodedSubName = Codec.encode(subscriptionName);
WebTarget path = topicPath(tn, "subscription", encodedSubName);
request(path).put(Entity.entity(messageId, MediaType.APPLICATION_JSON), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}
@Override
public CompletableFuture<Void> createSubscriptionAsync(String topic, String subscriptionName,
MessageId messageId) {
TopicName tn = validateTopic(topic);
String encodedSubName = Codec.encode(subscriptionName);
WebTarget path = topicPath(tn, "subscription", encodedSubName);
return asyncPutRequest(path, Entity.entity(messageId, MediaType.APPLICATION_JSON));
}
@Override
public void resetCursor(String topic, String subName, long timestamp) throws PulsarAdminException {
try {
TopicName tn = validateTopic(topic);
String encodedSubName = Codec.encode(subName);
WebTarget path = topicPath(tn, "subscription", encodedSubName,
"resetcursor", String.valueOf(timestamp));
request(path).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}
@Override
public CompletableFuture<Void> resetCursorAsync(String topic, String subName, long timestamp) {
TopicName tn = validateTopic(topic);
String encodedSubName = Codec.encode(subName);
WebTarget path = topicPath(tn, "subscription", encodedSubName,
"resetcursor", String.valueOf(timestamp));
return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}
@Override
public void resetCursor(String topic, String subName, MessageId messageId) throws PulsarAdminException {
try {
resetCursorAsync(topic, subName, messageId).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
} catch (Exception e) {
throw getApiException(e);
}
}
@Override
public void resetCursor(String topic, String subName, MessageId messageId
, boolean isExcluded) throws PulsarAdminException {
try {
resetCursorAsync(topic, subName, messageId, isExcluded).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
} catch (Exception e) {
throw getApiException(e);
}
}
@Override
public CompletableFuture<Void> resetCursorAsync(String topic, String subName, MessageId messageId) {
return resetCursorAsync(topic, subName, messageId, false);
}
@Override
public CompletableFuture<Void> resetCursorAsync(String topic, String subName
, MessageId messageId, boolean isExcluded) {
TopicName tn = validateTopic(topic);
String encodedSubName = Codec.encode(subName);
final WebTarget path = topicPath(tn, "subscription", encodedSubName, "resetcursor");
ResetCursorData resetCursorData = new ResetCursorData(messageId);
resetCursorData.setExcluded(isExcluded);
return asyncPostRequest(path, Entity.entity(resetCursorData, MediaType.APPLICATION_JSON));
}
@Override
public void triggerCompaction(String topic) throws PulsarAdminException {
try {
triggerCompactionAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> triggerCompactionAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "compaction");
return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON));
}
@Override
public LongRunningProcessStatus compactionStatus(String topic)
throws PulsarAdminException {
try {
return compactionStatusAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<LongRunningProcessStatus> compactionStatusAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "compaction");
final CompletableFuture<LongRunningProcessStatus> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<LongRunningProcessStatus>() {
@Override
public void completed(LongRunningProcessStatus longRunningProcessStatus) {
future.complete(longRunningProcessStatus);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public void triggerOffload(String topic, MessageId messageId) throws PulsarAdminException {
try {
triggerOffloadAsync(topic, messageId).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> triggerOffloadAsync(String topic, MessageId messageId) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "offload");
final CompletableFuture<Void> future = new CompletableFuture<>();
try {
request(path).async().put(Entity.entity(messageId, MediaType.APPLICATION_JSON)
, new InvocationCallback<MessageIdImpl>() {
@Override
public void completed(MessageIdImpl response) {
future.complete(null);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
} catch (PulsarAdminException cae) {
future.completeExceptionally(cae);
}
return future;
}
@Override
public OffloadProcessStatus offloadStatus(String topic)
throws PulsarAdminException {
try {
return offloadStatusAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<OffloadProcessStatus> offloadStatusAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "offload");
final CompletableFuture<OffloadProcessStatus> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<OffloadProcessStatus>() {
@Override
public void completed(OffloadProcessStatus offloadProcessStatus) {
future.complete(offloadProcessStatus);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
private WebTarget namespacePath(String domain, NamespaceName namespace, String... parts) {
final WebTarget base = namespace.isV2() ? adminV2Topics : adminTopics;
WebTarget namespacePath = base.path(domain).path(namespace.toString());
namespacePath = WebTargets.addParts(namespacePath, parts);
return namespacePath;
}
private WebTarget topicPath(TopicName topic, String... parts) {
final WebTarget base = topic.isV2() ? adminV2Topics : adminTopics;
WebTarget topicPath = base.path(topic.getRestPath());
topicPath = WebTargets.addParts(topicPath, parts);
return topicPath;
}
/*
* returns topic name with encoded Local Name
*/
private TopicName validateTopic(String topic) {
// Parsing will throw exception if name is not valid
return TopicName.get(topic);
}
private List<Message<byte[]>> getMessagesFromHttpResponse(String topic, Response response) throws Exception {
if (response.getStatus() != Status.OK.getStatusCode()) {
throw getApiException(response);
}
String msgId = response.getHeaderString(MESSAGE_ID);
MessageMetadata messageMetadata = new MessageMetadata();
try (InputStream stream = (InputStream) response.getEntity()) {
byte[] data = new byte[stream.available()];
stream.read(data);
Map<String, String> properties = Maps.newTreeMap();
MultivaluedMap<String, Object> headers = response.getHeaders();
Object tmp = headers.getFirst(PUBLISH_TIME);
if (tmp != null) {
properties.put("publish-time", (String) tmp);
}
tmp = headers.getFirst("X-Pulsar-null-value");
if (tmp != null) {
messageMetadata.setNullValue(Boolean.parseBoolean(tmp.toString()));
}
tmp = headers.getFirst(BATCH_HEADER);
if (response.getHeaderString(BATCH_HEADER) != null) {
properties.put(BATCH_HEADER, (String) tmp);
return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata);
}
for (Entry<String, List<Object>> entry : headers.entrySet()) {
String header = entry.getKey();
if (header.contains("X-Pulsar-PROPERTY-")) {
String keyName = header.substring("X-Pulsar-PROPERTY-".length());
properties.put(keyName, (String) entry.getValue().get(0));
}
}
return Collections.singletonList(new MessageImpl<byte[]>(topic, msgId, properties,
Unpooled.wrappedBuffer(data), Schema.BYTES, messageMetadata));
}
}
private List<Message<byte[]>> getIndividualMsgsFromBatch(String topic, String msgId, byte[] data,
Map<String, String> properties, MessageMetadata msgMetadataBuilder) {
List<Message<byte[]>> ret = new ArrayList<>();
int batchSize = Integer.parseInt(properties.get(BATCH_HEADER));
ByteBuf buf = Unpooled.wrappedBuffer(data);
for (int i = 0; i < batchSize; i++) {
String batchMsgId = msgId + ":" + i;
SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
try {
ByteBuf singleMessagePayload =
Commands.deSerializeSingleMessageInBatch(buf, singleMessageMetadata, i, batchSize);
if (singleMessageMetadata.getPropertiesCount() > 0) {
for (KeyValue entry : singleMessageMetadata.getPropertiesList()) {
properties.put(entry.getKey(), entry.getValue());
}
}
ret.add(new MessageImpl<>(topic, batchMsgId, properties, singleMessagePayload,
Schema.BYTES, msgMetadataBuilder));
} catch (Exception ex) {
log.error("Exception occurred while trying to get BatchMsgId: {}", batchMsgId, ex);
}
}
buf.release();
return ret;
}
@Override
public MessageId getLastMessageId(String topic) throws PulsarAdminException {
try {
return getLastMessageIdAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<MessageId> getLastMessageIdAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "lastMessageId");
final CompletableFuture<MessageId> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<BatchMessageIdImpl>() {
@Override
public void completed(BatchMessageIdImpl response) {
if (response.getBatchIndex() == -1) {
future.complete(new MessageIdImpl(response.getLedgerId(),
response.getEntryId(), response.getPartitionIndex()));
}
future.complete(response);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public Map<BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String topic) throws PulsarAdminException {
return getBacklogQuotaMap(topic, false);
}
@Override
public Map<BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String topic, boolean applied)
throws PulsarAdminException {
try {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "backlogQuotaMap");
path = path.queryParam("applied", applied);
return request(path).get(new GenericType<Map<BacklogQuotaType, BacklogQuota>>() {
});
} catch (Exception e) {
throw getApiException(e);
}
}
@Override
public void setBacklogQuota(String topic, BacklogQuota backlogQuota) throws PulsarAdminException {
try {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "backlogQuota");
request(path).post(Entity.entity(backlogQuota, MediaType.APPLICATION_JSON), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}
@Override
public void removeBacklogQuota(String topic) throws PulsarAdminException {
try {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "backlogQuota");
request(path.queryParam("backlogQuotaType", BacklogQuotaType.destination_storage.toString()))
.delete(ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}
@Override
public Integer getMaxUnackedMessagesOnConsumer(String topic) throws PulsarAdminException {
return getMaxUnackedMessagesOnConsumer(topic, false);
}
@Override
public CompletableFuture<Integer> getMaxUnackedMessagesOnConsumerAsync(String topic) {
return getMaxUnackedMessagesOnConsumerAsync(topic, false);
}
@Override
public Integer getMaxUnackedMessagesOnConsumer(String topic, boolean applied) throws PulsarAdminException {
try {
return getMaxUnackedMessagesOnConsumerAsync(topic, applied).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Integer> getMaxUnackedMessagesOnConsumerAsync(String topic, boolean applied) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "maxUnackedMessagesOnConsumer");
path = path.queryParam("applied", applied);
final CompletableFuture<Integer> future = new CompletableFuture<>();
asyncGetRequest(path, new InvocationCallback<Integer>() {
@Override
public void completed(Integer maxNum) {
future.complete(maxNum);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public CompletableFuture<Void> setMaxUnackedMessagesOnConsumerAsync(String topic, int maxNum) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "maxUnackedMessagesOnConsumer");
return asyncPostRequest(path, Entity.entity(maxNum, MediaType.APPLICATION_JSON));
}
@Override
public void setMaxUnackedMessagesOnConsumer(String topic, int maxNum) throws PulsarAdminException {
try {
setMaxUnackedMessagesOnConsumerAsync(topic, maxNum)
.get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> removeMaxUnackedMessagesOnConsumerAsync(String topic) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "maxUnackedMessagesOnConsumer");
return asyncDeleteRequest(path);
}
@Override
public void removeMaxUnackedMessagesOnConsumer(String topic) throws PulsarAdminException {
try {
removeMaxUnackedMessagesOnConsumerAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public InactiveTopicPolicies getInactiveTopicPolicies(String topic, boolean applied) throws PulsarAdminException {
try {
return getInactiveTopicPoliciesAsync(topic, applied).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<InactiveTopicPolicies> getInactiveTopicPoliciesAsync(String topic, boolean applied) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "inactiveTopicPolicies");
path = path.queryParam("applied", applied);
final CompletableFuture<InactiveTopicPolicies> future = new CompletableFuture<>();
asyncGetRequest(path, new InvocationCallback<InactiveTopicPolicies>() {
@Override
public void completed(InactiveTopicPolicies inactiveTopicPolicies) {
future.complete(inactiveTopicPolicies);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public InactiveTopicPolicies getInactiveTopicPolicies(String topic) throws PulsarAdminException {
return getInactiveTopicPolicies(topic, false);
}
@Override
public CompletableFuture<InactiveTopicPolicies> getInactiveTopicPoliciesAsync(String topic) {
return getInactiveTopicPoliciesAsync(topic, false);
}
@Override
public CompletableFuture<Void> setInactiveTopicPoliciesAsync(String topic
, InactiveTopicPolicies inactiveTopicPolicies) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "inactiveTopicPolicies");
return asyncPostRequest(path, Entity.entity(inactiveTopicPolicies, MediaType.APPLICATION_JSON));
}
@Override
public void setInactiveTopicPolicies(String topic
, InactiveTopicPolicies inactiveTopicPolicies) throws PulsarAdminException {
try {
setInactiveTopicPoliciesAsync(topic, inactiveTopicPolicies)
.get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> removeInactiveTopicPoliciesAsync(String topic) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "inactiveTopicPolicies");
return asyncDeleteRequest(path);
}
@Override
public void removeInactiveTopicPolicies(String topic) throws PulsarAdminException {
try {
removeInactiveTopicPoliciesAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public DelayedDeliveryPolicies getDelayedDeliveryPolicy(String topic
, boolean applied) throws PulsarAdminException {
try {
return getDelayedDeliveryPolicyAsync(topic, applied).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<DelayedDeliveryPolicies> getDelayedDeliveryPolicyAsync(String topic
, boolean applied) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "delayedDelivery");
path = path.queryParam("applied", applied);
final CompletableFuture<DelayedDeliveryPolicies> future = new CompletableFuture<>();
asyncGetRequest(path, new InvocationCallback<DelayedDeliveryPolicies>() {
@Override
public void completed(DelayedDeliveryPolicies delayedDeliveryPolicies) {
future.complete(delayedDeliveryPolicies);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public DelayedDeliveryPolicies getDelayedDeliveryPolicy(String topic) throws PulsarAdminException {
return getDelayedDeliveryPolicy(topic, false);
}
@Override
public CompletableFuture<DelayedDeliveryPolicies> getDelayedDeliveryPolicyAsync(String topic) {
return getDelayedDeliveryPolicyAsync(topic, false);
}
@Override
public CompletableFuture<Void> removeDelayedDeliveryPolicyAsync(String topic) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "delayedDelivery");
return asyncDeleteRequest(path);
}
@Override
public void removeDelayedDeliveryPolicy(String topic) throws PulsarAdminException {
try {
removeDelayedDeliveryPolicyAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> setDelayedDeliveryPolicyAsync(String topic
, DelayedDeliveryPolicies delayedDeliveryPolicies) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "delayedDelivery");
return asyncPostRequest(path, Entity.entity(delayedDeliveryPolicies, MediaType.APPLICATION_JSON));
}
@Override
public void setDelayedDeliveryPolicy(String topic
, DelayedDeliveryPolicies delayedDeliveryPolicies) throws PulsarAdminException {
try {
setDelayedDeliveryPolicyAsync(topic, delayedDeliveryPolicies)
.get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public Boolean getDeduplicationEnabled(String topic) throws PulsarAdminException {
try {
return getDeduplicationEnabledAsync(topic).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Boolean> getDeduplicationEnabledAsync(String topic) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "deduplicationEnabled");
final CompletableFuture<Boolean> future = new CompletableFuture<>();
asyncGetRequest(path, new InvocationCallback<Boolean>() {
@Override
public void completed(Boolean enabled) {
future.complete(enabled);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public Boolean getDeduplicationStatus(String topic) throws PulsarAdminException {
return getDeduplicationStatus(topic, false);
}
@Override
public CompletableFuture<Boolean> getDeduplicationStatusAsync(String topic) {
return getDeduplicationStatusAsync(topic, false);
}
@Override
public Boolean getDeduplicationStatus(String topic, boolean applied) throws PulsarAdminException {
try {
return getDeduplicationStatusAsync(topic, applied).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Boolean> getDeduplicationStatusAsync(String topic, boolean applied) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "deduplicationEnabled");
path = path.queryParam("applied", applied);
final CompletableFuture<Boolean> future = new CompletableFuture<>();
asyncGetRequest(path, new InvocationCallback<Boolean>() {
@Override
public void completed(Boolean enabled) {
future.complete(enabled);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public void enableDeduplication(String topic, boolean enabled) throws PulsarAdminException {
try {
enableDeduplicationAsync(topic, enabled).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> enableDeduplicationAsync(String topic, boolean enabled) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "deduplicationEnabled");
return asyncPostRequest(path, Entity.entity(enabled, MediaType.APPLICATION_JSON));
}
@Override
public void setDeduplicationStatus(String topic, boolean enabled) throws PulsarAdminException {
try {
enableDeduplicationAsync(topic, enabled).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> setDeduplicationStatusAsync(String topic, boolean enabled) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "deduplicationEnabled");
return asyncPostRequest(path, Entity.entity(enabled, MediaType.APPLICATION_JSON));
}
@Override
public void disableDeduplication(String topic) throws PulsarAdminException {
try {
disableDeduplicationAsync(topic).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> disableDeduplicationAsync(String topic) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "deduplicationEnabled");
return asyncDeleteRequest(path);
}
@Override
public void removeDeduplicationStatus(String topic) throws PulsarAdminException {
try {
removeDeduplicationStatusAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> removeDeduplicationStatusAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "deduplicationEnabled");
return asyncDeleteRequest(path);
}
@Override
public OffloadPolicies getOffloadPolicies(String topic) throws PulsarAdminException {
return getOffloadPolicies(topic, false);
}
@Override
public CompletableFuture<OffloadPolicies> getOffloadPoliciesAsync(String topic) {
return getOffloadPoliciesAsync(topic, false);
}
@Override
public OffloadPolicies getOffloadPolicies(String topic, boolean applied) throws PulsarAdminException {
try {
return getOffloadPoliciesAsync(topic, applied).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<OffloadPolicies> getOffloadPoliciesAsync(String topic, boolean applied) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "offloadPolicies");
path = path.queryParam("applied", applied);
final CompletableFuture<OffloadPolicies> future = new CompletableFuture<>();
asyncGetRequest(path, new InvocationCallback<OffloadPolicies>() {
@Override
public void completed(OffloadPolicies offloadPolicies) {
future.complete(offloadPolicies);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public void setOffloadPolicies(String topic, OffloadPolicies offloadPolicies) throws PulsarAdminException {
try {
setOffloadPoliciesAsync(topic, offloadPolicies).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> setOffloadPoliciesAsync(String topic, OffloadPolicies offloadPolicies) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "offloadPolicies");
return asyncPostRequest(path, Entity.entity(offloadPolicies, MediaType.APPLICATION_JSON));
}
@Override
public void removeOffloadPolicies(String topic) throws PulsarAdminException {
try {
removeOffloadPoliciesAsync(topic).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> removeOffloadPoliciesAsync(String topic) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "offloadPolicies");
return asyncDeleteRequest(path);
}
@Override
public Integer getMaxUnackedMessagesOnSubscription(String topic) throws PulsarAdminException {
return getMaxUnackedMessagesOnSubscription(topic, false);
}
@Override
public CompletableFuture<Integer> getMaxUnackedMessagesOnSubscriptionAsync(String topic) {
return getMaxUnackedMessagesOnSubscriptionAsync(topic, false);
}
@Override
public Integer getMaxUnackedMessagesOnSubscription(String topic, boolean applied) throws PulsarAdminException {
try {
return getMaxUnackedMessagesOnSubscriptionAsync(topic, applied).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Integer> getMaxUnackedMessagesOnSubscriptionAsync(String topic, boolean applied) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "maxUnackedMessagesOnSubscription");
path = path.queryParam("applied", applied);
final CompletableFuture<Integer> future = new CompletableFuture<>();
asyncGetRequest(path, new InvocationCallback<Integer>() {
@Override
public void completed(Integer maxNum) {
future.complete(maxNum);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public void setMaxUnackedMessagesOnSubscription(String topic, int maxNum) throws PulsarAdminException {
try {
setMaxUnackedMessagesOnSubscriptionAsync(topic, maxNum).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> setMaxUnackedMessagesOnSubscriptionAsync(String topic, int maxNum) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "maxUnackedMessagesOnSubscription");
return asyncPostRequest(path, Entity.entity(maxNum, MediaType.APPLICATION_JSON));
}
@Override
public void removeMaxUnackedMessagesOnSubscription(String topic) throws PulsarAdminException {
try {
removeMaxUnackedMessagesOnSubscriptionAsync(topic).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> removeMaxUnackedMessagesOnSubscriptionAsync(String topic) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "maxUnackedMessagesOnSubscription");
return asyncDeleteRequest(path);
}
@Override
public void setMessageTTL(String topic, int messageTTLInSecond) throws PulsarAdminException {
try {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "messageTTL");
request(path.queryParam("messageTTL", messageTTLInSecond)).
post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}
@Override
public Integer getMessageTTL(String topic) throws PulsarAdminException {
return getMessageTTL(topic, false);
}
@Override
public Integer getMessageTTL(String topic, boolean applied) throws PulsarAdminException {
try {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "messageTTL");
path = path.queryParam("applied", applied);
return request(path).get(new GenericType<Integer>() {});
} catch (Exception e) {
throw getApiException(e);
}
}
@Override
public void removeMessageTTL(String topic) throws PulsarAdminException {
try {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "messageTTL");
request(path.queryParam("messageTTL", 0)).delete(ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}
@Override
public void setRetention(String topic, RetentionPolicies retention) throws PulsarAdminException {
try {
setRetentionAsync(topic, retention).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> setRetentionAsync(String topic, RetentionPolicies retention) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "retention");
return asyncPostRequest(path, Entity.entity(retention, MediaType.APPLICATION_JSON));
}
@Override
public RetentionPolicies getRetention(String topic) throws PulsarAdminException {
return getRetention(topic, false);
}
@Override
public CompletableFuture<RetentionPolicies> getRetentionAsync(String topic) {
return getRetentionAsync(topic, false);
}
@Override
public RetentionPolicies getRetention(String topic, boolean applied) throws PulsarAdminException {
try {
return getRetentionAsync(topic, applied).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<RetentionPolicies> getRetentionAsync(String topic, boolean applied) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "retention");
path = path.queryParam("applied", applied);
final CompletableFuture<RetentionPolicies> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<RetentionPolicies>() {
@Override
public void completed(RetentionPolicies retentionPolicies) {
future.complete(retentionPolicies);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public void removeRetention(String topic) throws PulsarAdminException {
try {
removeRetentionAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> removeRetentionAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "retention");
return asyncDeleteRequest(path);
}
@Override
public void setPersistence(String topic, PersistencePolicies persistencePolicies) throws PulsarAdminException {
try {
setPersistenceAsync(topic, persistencePolicies).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> setPersistenceAsync(String topic, PersistencePolicies persistencePolicies) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "persistence");
return asyncPostRequest(path, Entity.entity(persistencePolicies, MediaType.APPLICATION_JSON));
}
@Override
public PersistencePolicies getPersistence(String topic) throws PulsarAdminException {
return getPersistence(topic, false);
}
@Override
public CompletableFuture<PersistencePolicies> getPersistenceAsync(String topic) {
return getPersistenceAsync(topic, false);
}
@Override
public PersistencePolicies getPersistence(String topic, boolean applied) throws PulsarAdminException {
try {
return getPersistenceAsync(topic, applied).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<PersistencePolicies> getPersistenceAsync(String topic, boolean applied) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "persistence");
path = path.queryParam("applied", applied);
final CompletableFuture<PersistencePolicies> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<PersistencePolicies>() {
@Override
public void completed(PersistencePolicies persistencePolicies) {
future.complete(persistencePolicies);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public void removePersistence(String topic) throws PulsarAdminException {
try {
removePersistenceAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> removePersistenceAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "persistence");
return asyncDeleteRequest(path);
}
@Override
public DispatchRate getDispatchRate(String topic, boolean applied) throws PulsarAdminException {
try {
return getDispatchRateAsync(topic, applied).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<DispatchRate> getDispatchRateAsync(String topic, boolean applied) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "dispatchRate");
path = path.queryParam("applied", applied);
final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<DispatchRate>() {
@Override
public void completed(DispatchRate dispatchRate) {
future.complete(dispatchRate);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public DispatchRate getDispatchRate(String topic) throws PulsarAdminException {
return getDispatchRate(topic, false);
}
@Override
public CompletableFuture<DispatchRate> getDispatchRateAsync(String topic) {
return getDispatchRateAsync(topic, false);
}
@Override
public void setDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException {
try {
setDispatchRateAsync(topic, dispatchRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> setDispatchRateAsync(String topic, DispatchRate dispatchRate) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "dispatchRate");
return asyncPostRequest(path, Entity.entity(dispatchRate, MediaType.APPLICATION_JSON));
}
@Override
public void removeDispatchRate(String topic) throws PulsarAdminException {
try {
removeDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> removeDispatchRateAsync(String topic) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "dispatchRate");
return asyncDeleteRequest(path);
}
@Override
public DispatchRate getSubscriptionDispatchRate(String topic, boolean applied) throws PulsarAdminException {
try {
return getSubscriptionDispatchRateAsync(topic, applied).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<DispatchRate> getSubscriptionDispatchRateAsync(String topic, boolean applied) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "subscriptionDispatchRate");
path = path.queryParam("applied", applied);
final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<DispatchRate>() {
@Override
public void completed(DispatchRate dispatchRate) {
future.complete(dispatchRate);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public DispatchRate getSubscriptionDispatchRate(String topic) throws PulsarAdminException {
return getSubscriptionDispatchRate(topic, false);
}
@Override
public CompletableFuture<DispatchRate> getSubscriptionDispatchRateAsync(String topic) {
return getSubscriptionDispatchRateAsync(topic, false);
}
@Override
public void setSubscriptionDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException {
try {
setSubscriptionDispatchRateAsync(topic, dispatchRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> setSubscriptionDispatchRateAsync(String topic, DispatchRate dispatchRate) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "subscriptionDispatchRate");
return asyncPostRequest(path, Entity.entity(dispatchRate, MediaType.APPLICATION_JSON));
}
@Override
public void removeSubscriptionDispatchRate(String topic) throws PulsarAdminException {
try {
removeSubscriptionDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> removeSubscriptionDispatchRateAsync(String topic) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "subscriptionDispatchRate");
return asyncDeleteRequest(path);
}
@Override
public Long getCompactionThreshold(String topic) throws PulsarAdminException {
try {
return getCompactionThresholdAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Long> getCompactionThresholdAsync(String topic) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "compactionThreshold");
final CompletableFuture<Long> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Long>() {
@Override
public void completed(Long compactionThreshold) {
future.complete(compactionThreshold);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public void setCompactionThreshold(String topic, long compactionThreshold) throws PulsarAdminException {
try {
setCompactionThresholdAsync(topic, compactionThreshold).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> setCompactionThresholdAsync(String topic, long compactionThreshold) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "compactionThreshold");
return asyncPostRequest(path, Entity.entity(compactionThreshold, MediaType.APPLICATION_JSON));
}
@Override
public void removeCompactionThreshold(String topic) throws PulsarAdminException {
try {
removeCompactionThresholdAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> removeCompactionThresholdAsync(String topic) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "compactionThreshold");
return asyncDeleteRequest(path);
}
@Override
public PublishRate getPublishRate(String topic) throws PulsarAdminException {
try {
return getPublishRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<PublishRate> getPublishRateAsync(String topic) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "publishRate");
final CompletableFuture<PublishRate> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<PublishRate>() {
@Override
public void completed(PublishRate publishRate) {
future.complete(publishRate);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public void setPublishRate(String topic, PublishRate publishRate) throws PulsarAdminException {
try {
setPublishRateAsync(topic, publishRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> setPublishRateAsync(String topic, PublishRate publishRate) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "publishRate");
return asyncPostRequest(path, Entity.entity(publishRate, MediaType.APPLICATION_JSON));
}
@Override
public void removePublishRate(String topic) throws PulsarAdminException {
try {
removePublishRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> removePublishRateAsync(String topic) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "publishRate");
return asyncDeleteRequest(path);
}
@Override
public Integer getMaxConsumersPerSubscription(String topic) throws PulsarAdminException {
try {
return getMaxConsumersPerSubscriptionAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Integer> getMaxConsumersPerSubscriptionAsync(String topic) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "maxConsumersPerSubscription");
final CompletableFuture<Integer> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Integer>() {
@Override
public void completed(Integer maxConsumersPerSubscription) {
future.complete(maxConsumersPerSubscription);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public void setMaxConsumersPerSubscription(String topic, int maxConsumersPerSubscription)
throws PulsarAdminException {
try {
setMaxConsumersPerSubscriptionAsync(topic, maxConsumersPerSubscription)
.get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> setMaxConsumersPerSubscriptionAsync(String topic, int maxConsumersPerSubscription) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "maxConsumersPerSubscription");
return asyncPostRequest(path, Entity.entity(maxConsumersPerSubscription, MediaType.APPLICATION_JSON));
}
@Override
public void removeMaxConsumersPerSubscription(String topic) throws PulsarAdminException {
try {
removeMaxConsumersPerSubscriptionAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> removeMaxConsumersPerSubscriptionAsync(String topic) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "maxConsumersPerSubscription");
return asyncDeleteRequest(path);
}
@Override
public Integer getMaxProducers(String topic) throws PulsarAdminException {
return getMaxProducers(topic, false);
}
@Override
public CompletableFuture<Integer> getMaxProducersAsync(String topic) {
return getMaxProducersAsync(topic, false);
}
@Override
public Integer getMaxProducers(String topic, boolean applied) throws PulsarAdminException {
try {
return getMaxProducersAsync(topic, applied).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Integer> getMaxProducersAsync(String topic, boolean applied) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "maxProducers");
path = path.queryParam("applied", applied);
final CompletableFuture<Integer> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Integer>() {
@Override
public void completed(Integer maxProducers) {
future.complete(maxProducers);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public void setMaxProducers(String topic, int maxProducers) throws PulsarAdminException {
try {
setMaxProducersAsync(topic, maxProducers).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> setMaxProducersAsync(String topic, int maxProducers) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "maxProducers");
return asyncPostRequest(path, Entity.entity(maxProducers, MediaType.APPLICATION_JSON));
}
@Override
public void removeMaxProducers(String topic) throws PulsarAdminException {
try {
removeMaxProducersAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> removeMaxProducersAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "maxProducers");
return asyncDeleteRequest(path);
}
@Override
public Integer getMaxSubscriptionsPerTopic(String topic) throws PulsarAdminException {
try {
return getMaxSubscriptionsPerTopicAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Integer> getMaxSubscriptionsPerTopicAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "maxSubscriptionsPerTopic");
final CompletableFuture<Integer> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Integer>() {
@Override
public void completed(Integer maxSubscriptionsPerTopic) {
future.complete(maxSubscriptionsPerTopic);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public void setMaxSubscriptionsPerTopic(String topic, int maxSubscriptionsPerTopic) throws PulsarAdminException {
try {
setMaxSubscriptionsPerTopicAsync(topic, maxSubscriptionsPerTopic)
.get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> setMaxSubscriptionsPerTopicAsync(String topic, int maxSubscriptionsPerTopic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "maxSubscriptionsPerTopic");
return asyncPostRequest(path, Entity.entity(maxSubscriptionsPerTopic, MediaType.APPLICATION_JSON));
}
@Override
public void removeMaxSubscriptionsPerTopic(String topic) throws PulsarAdminException {
try {
removeMaxSubscriptionsPerTopicAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> removeMaxSubscriptionsPerTopicAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "maxSubscriptionsPerTopic");
return asyncDeleteRequest(path);
}
@Override
public Integer getMaxMessageSize(String topic) throws PulsarAdminException {
try {
return getMaxMessageSizeAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Integer> getMaxMessageSizeAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "maxMessageSize");
final CompletableFuture<Integer> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Integer>() {
@Override
public void completed(Integer maxMessageSize) {
future.complete(maxMessageSize);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public void setMaxMessageSize(String topic, int maxMessageSize) throws PulsarAdminException {
try {
setMaxMessageSizeAsync(topic, maxMessageSize).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> setMaxMessageSizeAsync(String topic, int maxMessageSize) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "maxMessageSize");
return asyncPostRequest(path, Entity.entity(maxMessageSize, MediaType.APPLICATION_JSON));
}
@Override
public void removeMaxMessageSize(String topic) throws PulsarAdminException {
try {
removeMaxMessageSizeAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> removeMaxMessageSizeAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "maxMessageSize");
return asyncDeleteRequest(path);
}
@Override
public Integer getMaxConsumers(String topic) throws PulsarAdminException {
return getMaxConsumers(topic, false);
}
@Override
public CompletableFuture<Integer> getMaxConsumersAsync(String topic) {
return getMaxConsumersAsync(topic, false);
}
@Override
public Integer getMaxConsumers(String topic, boolean applied) throws PulsarAdminException {
try {
return getMaxConsumersAsync(topic, applied).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Integer> getMaxConsumersAsync(String topic, boolean applied) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "maxConsumers");
path = path.queryParam("applied", applied);
final CompletableFuture<Integer> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Integer>() {
@Override
public void completed(Integer maxProducers) {
future.complete(maxProducers);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public void setMaxConsumers(String topic, int maxConsumers) throws PulsarAdminException {
try {
setMaxConsumersAsync(topic, maxConsumers).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> setMaxConsumersAsync(String topic, int maxConsumers) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "maxConsumers");
return asyncPostRequest(path, Entity.entity(maxConsumers, MediaType.APPLICATION_JSON));
}
@Override
public void removeMaxConsumers(String topic) throws PulsarAdminException {
try {
removeMaxConsumersAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> removeMaxConsumersAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "maxConsumers");
return asyncDeleteRequest(path);
}
@Override
public Integer getDeduplicationSnapshotInterval(String topic) throws PulsarAdminException {
try {
return getDeduplicationSnapshotIntervalAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Integer> getDeduplicationSnapshotIntervalAsync(String topic) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "deduplicationSnapshotInterval");
final CompletableFuture<Integer> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Integer>() {
@Override
public void completed(Integer interval) {
future.complete(interval);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public void setDeduplicationSnapshotInterval(String topic, int interval) throws PulsarAdminException {
try {
setDeduplicationSnapshotIntervalAsync(topic, interval).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> setDeduplicationSnapshotIntervalAsync(String topic, int interval) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "deduplicationSnapshotInterval");
return asyncPostRequest(path, Entity.entity(interval, MediaType.APPLICATION_JSON));
}
@Override
public void removeDeduplicationSnapshotInterval(String topic) throws PulsarAdminException {
try {
removeDeduplicationSnapshotIntervalAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> removeDeduplicationSnapshotIntervalAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "deduplicationSnapshotInterval");
return asyncDeleteRequest(path);
}
@Override
public void setSubscriptionTypesEnabled(
String topic, Set<SubscriptionType>
subscriptionTypesEnabled) throws PulsarAdminException {
try {
setSubscriptionTypesEnabledAsync(topic, subscriptionTypesEnabled)
.get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> setSubscriptionTypesEnabledAsync(String topic,
Set<SubscriptionType> subscriptionTypesEnabled) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "subscriptionTypesEnabled");
return asyncPostRequest(path, Entity.entity(subscriptionTypesEnabled, MediaType.APPLICATION_JSON));
}
@Override
public Set<SubscriptionType> getSubscriptionTypesEnabled(String topic) throws PulsarAdminException {
try {
return getSubscriptionTypesEnabledAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Set<SubscriptionType>> getSubscriptionTypesEnabledAsync(String topic) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "subscriptionTypesEnabled");
final CompletableFuture<Set<SubscriptionType>> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Set<SubscriptionType>>() {
@Override
public void completed(Set<SubscriptionType> subscriptionTypesEnabled) {
future.complete(subscriptionTypesEnabled);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public DispatchRate getReplicatorDispatchRate(String topic) throws PulsarAdminException {
return getReplicatorDispatchRate(topic, false);
}
@Override
public CompletableFuture<DispatchRate> getReplicatorDispatchRateAsync(String topic) {
return getReplicatorDispatchRateAsync(topic, false);
}
@Override
public DispatchRate getReplicatorDispatchRate(String topic, boolean applied) throws PulsarAdminException {
try {
return getReplicatorDispatchRateAsync(topic, applied).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<DispatchRate> getReplicatorDispatchRateAsync(String topic, boolean applied) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "replicatorDispatchRate");
path = path.queryParam("applied", applied);
final CompletableFuture<DispatchRate> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<DispatchRate>() {
@Override
public void completed(DispatchRate dispatchRate) {
future.complete(dispatchRate);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public void setReplicatorDispatchRate(String topic, DispatchRate dispatchRate) throws PulsarAdminException {
try {
setReplicatorDispatchRateAsync(topic, dispatchRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> setReplicatorDispatchRateAsync(String topic, DispatchRate dispatchRate) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "replicatorDispatchRate");
return asyncPostRequest(path, Entity.entity(dispatchRate, MediaType.APPLICATION_JSON));
}
@Override
public void removeReplicatorDispatchRate(String topic) throws PulsarAdminException {
try {
removeReplicatorDispatchRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> removeReplicatorDispatchRateAsync(String topic) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "replicatorDispatchRate");
return asyncDeleteRequest(path);
}
@Override
public SubscribeRate getSubscribeRate(String topic) throws PulsarAdminException {
return getSubscribeRate(topic, false);
}
@Override
public CompletableFuture<SubscribeRate> getSubscribeRateAsync(String topic) {
return getSubscribeRateAsync(topic, false);
}
@Override
public SubscribeRate getSubscribeRate(String topic, boolean applied) throws PulsarAdminException {
try {
return getSubscribeRateAsync(topic, applied).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<SubscribeRate> getSubscribeRateAsync(String topic, boolean applied) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "subscribeRate");
path = path.queryParam("applied", applied);
final CompletableFuture<SubscribeRate> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<SubscribeRate>() {
@Override
public void completed(SubscribeRate subscribeRate) {
future.complete(subscribeRate);
}
@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}
@Override
public void setSubscribeRate(String topic, SubscribeRate subscribeRate) throws PulsarAdminException {
try {
setSubscribeRateAsync(topic, subscribeRate).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> setSubscribeRateAsync(String topic, SubscribeRate subscribeRate) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "subscribeRate");
return asyncPostRequest(path, Entity.entity(subscribeRate, MediaType.APPLICATION_JSON));
}
@Override
public void removeSubscribeRate(String topic) throws PulsarAdminException {
try {
removeSubscribeRateAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
public CompletableFuture<Void> removeSubscribeRateAsync(String topic) {
TopicName topicName = validateTopic(topic);
WebTarget path = topicPath(topicName, "subscribeRate");
return asyncDeleteRequest(path);
}
private static final Logger log = LoggerFactory.getLogger(TopicsImpl.class);
}