blob: 02e44aca626046e4a9113bf2322f779dbb781a11 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.admin.internal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import org.apache.pulsar.client.admin.Clusters;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
public class ClustersImpl extends BaseResource implements Clusters {
private final WebTarget adminClusters;
public ClustersImpl(WebTarget web, Authentication auth, long readTimeoutMs) {
super(auth, readTimeoutMs);
adminClusters = web.path("/admin/v2/clusters");
}
@Override
public List<String> getClusters() throws PulsarAdminException {
return sync(this::getClustersAsync);
}
@Override
public CompletableFuture<List<String>> getClustersAsync() {
WebTarget path = this.adminClusters;
return asyncGetRequest(path, new FutureCallback<List<String>>(){});
}
@Override
public ClusterData getCluster(String cluster) throws PulsarAdminException {
return sync(() -> getClusterAsync(cluster));
}
@Override
public CompletableFuture<ClusterData> getClusterAsync(String cluster) {
WebTarget path = adminClusters.path(cluster);
return asyncGetRequest(path, new FutureCallback<ClusterDataImpl>(){})
.thenApply(clusterData -> clusterData);
}
@Override
public void createCluster(String cluster, ClusterData clusterData) throws PulsarAdminException {
sync(() -> createClusterAsync(cluster, clusterData));
}
@Override
public CompletableFuture<Void> createClusterAsync(String cluster, ClusterData clusterData) {
WebTarget path = adminClusters.path(cluster);
return asyncPutRequest(path, Entity.entity((ClusterDataImpl) clusterData, MediaType.APPLICATION_JSON));
}
@Override
public void updateCluster(String cluster, ClusterData clusterData) throws PulsarAdminException {
sync(() -> updateClusterAsync(cluster, clusterData));
}
@Override
public CompletableFuture<Void> updateClusterAsync(String cluster, ClusterData clusterData) {
WebTarget path = adminClusters.path(cluster);
return asyncPostRequest(path, Entity.entity((ClusterDataImpl) clusterData, MediaType.APPLICATION_JSON_TYPE));
}
@Override
public void updatePeerClusterNames(
String cluster, LinkedHashSet<String> peerClusterNames) throws PulsarAdminException {
sync(() -> updatePeerClusterNamesAsync(cluster, peerClusterNames));
}
@Override
public CompletableFuture<Void> updatePeerClusterNamesAsync(String cluster, LinkedHashSet<String> peerClusterNames) {
WebTarget path = adminClusters.path(cluster).path("peers");
return asyncPostRequest(path, Entity.entity(peerClusterNames, MediaType.APPLICATION_JSON));
}
@Override
public void updateClusterMigration(String cluster, boolean isMigrated, ClusterUrl clusterUrl)
throws PulsarAdminException {
sync(() -> updateClusterMigrationAsync(cluster, isMigrated, clusterUrl));
}
@Override
public CompletableFuture<Void> updateClusterMigrationAsync(String cluster, boolean isMigrated,
ClusterUrl clusterUrl) {
WebTarget path = adminClusters.path(cluster).path("migrate").queryParam("migrated", isMigrated);
return asyncPostRequest(path, Entity.entity(clusterUrl, MediaType.APPLICATION_JSON));
}
@Override
@SuppressWarnings("unchecked")
public Set<String> getPeerClusterNames(String cluster) throws PulsarAdminException {
return sync(() -> getPeerClusterNamesAsync(cluster));
}
@Override
public CompletableFuture<Set<String>> getPeerClusterNamesAsync(String cluster) {
WebTarget path = adminClusters.path(cluster).path("peers");
return asyncGetRequest(path, new FutureCallback<Set<String>>(){});
}
@Override
public void deleteCluster(String cluster) throws PulsarAdminException {
sync(() -> deleteClusterAsync(cluster));
}
@Override
public CompletableFuture<Void> deleteClusterAsync(String cluster) {
WebTarget path = adminClusters.path(cluster);
return asyncDeleteRequest(path);
}
@Override
public Map<String, NamespaceIsolationData> getNamespaceIsolationPolicies(String cluster)
throws PulsarAdminException {
return sync(() -> getNamespaceIsolationPoliciesAsync(cluster));
}
@Override
public CompletableFuture<Map<String, NamespaceIsolationData>> getNamespaceIsolationPoliciesAsync(
String cluster) {
WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies");
return asyncGetRequest(path, new FutureCallback<Map<String, NamespaceIsolationDataImpl>>() {})
.thenApply(HashMap::new);
}
@Override
public List<BrokerNamespaceIsolationData> getBrokersWithNamespaceIsolationPolicy(String cluster)
throws PulsarAdminException {
return sync(() -> getBrokersWithNamespaceIsolationPolicyAsync(cluster));
}
@Override
public CompletableFuture<List<BrokerNamespaceIsolationData>> getBrokersWithNamespaceIsolationPolicyAsync(
String cluster) {
WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path("brokers");
return asyncGetRequest(path, new FutureCallback<List<BrokerNamespaceIsolationDataImpl>>() {})
.thenApply(ArrayList::new);
}
@Override
public BrokerNamespaceIsolationData getBrokerWithNamespaceIsolationPolicy(String cluster, String broker)
throws PulsarAdminException {
return sync(() -> getBrokerWithNamespaceIsolationPolicyAsync(cluster, broker));
}
@Override
public CompletableFuture<BrokerNamespaceIsolationData> getBrokerWithNamespaceIsolationPolicyAsync(
String cluster, String broker) {
WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path("brokers").path(broker);
return asyncGetRequest(path, new FutureCallback<BrokerNamespaceIsolationDataImpl>(){})
.thenApply(brokerNamespaceIsolationData -> brokerNamespaceIsolationData);
}
@Override
public void createNamespaceIsolationPolicy(String cluster, String policyName,
NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException {
setNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData);
}
@Override
public CompletableFuture<Void> createNamespaceIsolationPolicyAsync(
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) {
return setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData);
}
@Override
public void updateNamespaceIsolationPolicy(String cluster, String policyName,
NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException {
setNamespaceIsolationPolicy(cluster, policyName, namespaceIsolationData);
}
@Override
public CompletableFuture<Void> updateNamespaceIsolationPolicyAsync(
String cluster, String policyName, NamespaceIsolationData namespaceIsolationData) {
return setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData);
}
@Override
public void deleteNamespaceIsolationPolicy(String cluster, String policyName) throws PulsarAdminException {
sync(() -> deleteNamespaceIsolationPolicyAsync(cluster, policyName));
}
@Override
public CompletableFuture<Void> deleteNamespaceIsolationPolicyAsync(String cluster, String policyName) {
WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path(policyName);
return asyncDeleteRequest(path);
}
private void setNamespaceIsolationPolicy(String cluster, String policyName,
NamespaceIsolationData namespaceIsolationData) throws PulsarAdminException {
sync(() -> setNamespaceIsolationPolicyAsync(cluster, policyName, namespaceIsolationData));
}
private CompletableFuture<Void> setNamespaceIsolationPolicyAsync(String cluster, String policyName,
NamespaceIsolationData namespaceIsolationData) {
WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path(policyName);
return asyncPostRequest(path, Entity.entity(namespaceIsolationData, MediaType.APPLICATION_JSON));
}
@Override
public NamespaceIsolationData getNamespaceIsolationPolicy(String cluster, String policyName)
throws PulsarAdminException {
return sync(() -> getNamespaceIsolationPolicyAsync(cluster, policyName));
}
@Override
public CompletableFuture<NamespaceIsolationData> getNamespaceIsolationPolicyAsync(
String cluster, String policyName) {
WebTarget path = adminClusters.path(cluster).path("namespaceIsolationPolicies").path(policyName);
return asyncGetRequest(path, new FutureCallback<NamespaceIsolationDataImpl>(){})
.thenApply(namespaceIsolationData -> namespaceIsolationData);
}
@Override
public void createFailureDomain(String cluster, String domainName, FailureDomain domain)
throws PulsarAdminException {
setDomain(cluster, domainName, domain);
}
@Override
public CompletableFuture<Void> createFailureDomainAsync(String cluster, String domainName,
FailureDomain domain) {
return setDomainAsync(cluster, domainName, domain);
}
@Override
public void updateFailureDomain(String cluster, String domainName, FailureDomain domain)
throws PulsarAdminException {
setDomain(cluster, domainName, domain);
}
@Override
public CompletableFuture<Void> updateFailureDomainAsync(String cluster, String domainName,
FailureDomain domain) {
return setDomainAsync(cluster, domainName, domain);
}
@Override
public void deleteFailureDomain(String cluster, String domainName) throws PulsarAdminException {
sync(() -> deleteFailureDomainAsync(cluster, domainName));
}
@Override
public CompletableFuture<Void> deleteFailureDomainAsync(String cluster, String domainName) {
WebTarget path = adminClusters.path(cluster).path("failureDomains").path(domainName);
return asyncDeleteRequest(path);
}
@Override
public Map<String, FailureDomain> getFailureDomains(String cluster) throws PulsarAdminException {
return sync(() -> getFailureDomainsAsync(cluster));
}
@Override
public CompletableFuture<Map<String, FailureDomain>> getFailureDomainsAsync(String cluster) {
WebTarget path = adminClusters.path(cluster).path("failureDomains");
return asyncGetRequest(path, new FutureCallback<Map<String, FailureDomainImpl>>() {})
.thenApply(HashMap::new);
}
@Override
public FailureDomain getFailureDomain(String cluster, String domainName) throws PulsarAdminException {
return sync(() -> getFailureDomainAsync(cluster, domainName));
}
@Override
public CompletableFuture<FailureDomain> getFailureDomainAsync(String cluster, String domainName) {
WebTarget path = adminClusters.path(cluster).path("failureDomains").path(domainName);
return asyncGetRequest(path, new FutureCallback<FailureDomainImpl>(){})
.thenApply(failureDomain -> failureDomain);
}
private void setDomain(String cluster, String domainName,
FailureDomain domain) throws PulsarAdminException {
sync(() -> setDomainAsync(cluster, domainName, domain));
}
private CompletableFuture<Void> setDomainAsync(String cluster, String domainName,
FailureDomain domain) {
WebTarget path = adminClusters.path(cluster).path("failureDomains").path(domainName);
return asyncPostRequest(path, Entity.entity((FailureDomainImpl) domain, MediaType.APPLICATION_JSON));
}
}