blob: 21f809df8734e5a165759e39db0c2440277c6ae8 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.pulsar.client.admin;
import java.util.Map;
import org.apache.pulsar.client.admin.internal.BrokerStatsImpl;
import org.apache.pulsar.client.admin.internal.BrokersImpl;
import org.apache.pulsar.client.admin.internal.ClustersImpl;
import org.apache.pulsar.client.admin.internal.JacksonConfigurator;
import org.apache.pulsar.client.admin.internal.LookupImpl;
import org.apache.pulsar.client.admin.internal.NamespacesImpl;
import org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl;
import org.apache.pulsar.client.admin.internal.PersistentTopicsImpl;
import org.apache.pulsar.client.admin.internal.PropertiesImpl;
import org.apache.pulsar.client.admin.internal.ResourceQuotasImpl;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.common.util.SecurityUtility;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.jackson.JacksonFeature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.bridge.SLF4JBridgeHandler;
* Pulsar client admin API client.
public class PulsarAdmin implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(PulsarAdmin.class);
private final Clusters clusters;
private final Brokers brokers;
private final BrokerStats brokerStats;
private final Properties properties;
private final Namespaces namespaces;
private final PersistentTopics persistentTopics;
private final NonPersistentTopics nonPersistentTopics;
private final ResourceQuotas resourceQuotas;
private final Client client;
private final URL serviceUrl;
private final WebTarget web;
private final Lookup lookups;
private final Authentication auth;
static {
* The presence of slf4j-jdk14.jar, that is the jul binding for SLF4J, will force SLF4J calls to be delegated to
* jul. On the other hand, the presence of jul-to-slf4j.jar, plus the installation of SLF4JBridgeHandler, by
* invoking "SLF4JBridgeHandler.install()" will route jul records to SLF4J. Thus, if both jar are present
* simultaneously (and SLF4JBridgeHandler is installed), slf4j calls will be delegated to jul and jul records
* will be routed to SLF4J, resulting in an endless loop. We avoid this loop by detecting if slf4j-jdk14 is used
* in the client class path. If slf4j-jdk14 is found, we don't use the slf4j bridge.
try {
} catch (Exception ex) {
// Setup the bridge for java.util.logging to SLF4J
* Construct a new Pulsar Admin client object.
* <p>
* This client object can be used to perform many subsquent API calls
* @param serviceUrl
* the Pulsar service URL (eg. "")
* @param pulsarConfig
* the ClientConfiguration object to be used to talk with Pulsar
public PulsarAdmin(URL serviceUrl, ClientConfiguration pulsarConfig) throws PulsarClientException {
this.auth = pulsarConfig != null ? pulsarConfig.getAuthentication() : new AuthenticationDisabled();
LOG.debug("created: serviceUrl={}, authMethodName={}", serviceUrl,
auth != null ? auth.getAuthMethodName() : null);
if (auth != null) {
ClientConfig httpConfig = new ClientConfig();, true);, 8);
ClientBuilder clientBuilder = ClientBuilder.newBuilder().withConfig(httpConfig)
if (pulsarConfig != null && pulsarConfig.isUseTls()) {
try {
SSLContext sslCtx = null;
X509Certificate trustCertificates[] = SecurityUtility
// Set private key and certificate if available
AuthenticationDataProvider authData = auth.getAuthData();
if (authData.hasDataForTls()) {
sslCtx = SecurityUtility.createSslContext(pulsarConfig.isTlsAllowInsecureConnection(),
trustCertificates, authData.getTlsCertificates(), authData.getTlsPrivateKey());
} else {
sslCtx = SecurityUtility.createSslContext(pulsarConfig.isTlsAllowInsecureConnection(),
} catch (Exception e) {
try {
if (auth != null) {
} catch (IOException ioe) {
LOG.error("Failed to close the authentication service", ioe);
throw new PulsarClientException.InvalidConfigurationException(e.getMessage());
this.client =;
this.serviceUrl = serviceUrl;
WebTarget root =;
web = root.path("/admin");
this.clusters = new ClustersImpl(web, auth);
this.brokers = new BrokersImpl(web, auth);
this.brokerStats = new BrokerStatsImpl(web, auth); = new PropertiesImpl(web, auth);
this.namespaces = new NamespacesImpl(web, auth);
this.persistentTopics = new PersistentTopicsImpl(web, auth);
this.nonPersistentTopics = new NonPersistentTopicsImpl(web, auth);
this.resourceQuotas = new ResourceQuotasImpl(web, auth);
this.lookups = new LookupImpl(root, auth, pulsarConfig.isUseTls());
* Construct a new Pulsar Admin client object.
* <p>
* This client object can be used to perform many subsquent API calls
* @param serviceUrl
* the Pulsar service URL (eg. "")
* @param auth
* the Authentication object to be used to talk with Pulsar
public PulsarAdmin(URL serviceUrl, Authentication auth) throws PulsarClientException {
this(serviceUrl, new ClientConfiguration() {
private static final long serialVersionUID = 1L;
* Construct a new Pulsar Admin client object.
* <p>
* This client object can be used to perform many subsquent API calls
* @param serviceUrl
* the Pulsar URL (eg. "")
* @param authPluginClassName
* name of the Authentication-Plugin you want to use
* @param authParamsString
* string which represents parameters for the Authentication-Plugin, e.g., "key1:val1,key2:val2"
public PulsarAdmin(URL serviceUrl, String authPluginClassName, String authParamsString) throws PulsarClientException {
this(serviceUrl, AuthenticationFactory.create(authPluginClassName, authParamsString));
* Construct a new Pulsar Admin client object.
* <p>
* This client object can be used to perform many subsquent API calls
* @param serviceUrl
* the Pulsar URL (eg. "")
* @param authPluginClassName
* name of the Authentication-Plugin you want to use
* @param authParams
* map which represents parameters for the Authentication-Plugin
public PulsarAdmin(URL serviceUrl, String authPluginClassName, Map<String, String> authParams) throws PulsarClientException {
this(serviceUrl, AuthenticationFactory.create(authPluginClassName, authParams));
* @return the clusters management object
public Clusters clusters() {
return clusters;
* @return the brokers management object
public Brokers brokers() {
return brokers;
* @return the properties management object
public Properties properties() {
return properties;
* @return the namespaces management object
public Namespaces namespaces() {
return namespaces;
* @return the persistentTopics management object
public PersistentTopics persistentTopics() {
return persistentTopics;
* @return the persistentTopics management object
public NonPersistentTopics nonPersistentTopics() {
return nonPersistentTopics;
* @return the resource quota management object
public ResourceQuotas resourceQuotas() {
return resourceQuotas;
* @return does a looks up for the broker serving the destination
public Lookup lookups() {
return lookups;
* @return the broker statics
public BrokerStats brokerStats() {
return brokerStats;
* @return the service URL that is being used
public URL getServiceUrl() {
return serviceUrl;
* Close the Pulsar admin client to release all the resources
public void close() {
try {
if (auth != null) {
} catch (IOException e) {
LOG.error("Failed to close the authentication service", e);