blob: 21f809df8734e5a165759e39db0c2440277c6ae8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.admin;
import java.io.Closeable;
import java.io.IOException;
import java.net.URL;
import java.security.cert.X509Certificate;
import java.util.Map;
import javax.net.ssl.SSLContext;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import org.apache.pulsar.client.admin.internal.BrokerStatsImpl;
import org.apache.pulsar.client.admin.internal.BrokersImpl;
import org.apache.pulsar.client.admin.internal.ClustersImpl;
import org.apache.pulsar.client.admin.internal.JacksonConfigurator;
import org.apache.pulsar.client.admin.internal.LookupImpl;
import org.apache.pulsar.client.admin.internal.NamespacesImpl;
import org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl;
import org.apache.pulsar.client.admin.internal.PersistentTopicsImpl;
import org.apache.pulsar.client.admin.internal.PropertiesImpl;
import org.apache.pulsar.client.admin.internal.ResourceQuotasImpl;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.common.util.SecurityUtility;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.jackson.JacksonFeature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.bridge.SLF4JBridgeHandler;
/**
* Pulsar client admin API client.
*/
public class PulsarAdmin implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(PulsarAdmin.class);
private final Clusters clusters;
private final Brokers brokers;
private final BrokerStats brokerStats;
private final Properties properties;
private final Namespaces namespaces;
private final PersistentTopics persistentTopics;
private final NonPersistentTopics nonPersistentTopics;
private final ResourceQuotas resourceQuotas;
private final Client client;
private final URL serviceUrl;
private final WebTarget web;
private final Lookup lookups;
private final Authentication auth;
static {
/**
* The presence of slf4j-jdk14.jar, that is the jul binding for SLF4J, will force SLF4J calls to be delegated to
* jul. On the other hand, the presence of jul-to-slf4j.jar, plus the installation of SLF4JBridgeHandler, by
* invoking "SLF4JBridgeHandler.install()" will route jul records to SLF4J. Thus, if both jar are present
* simultaneously (and SLF4JBridgeHandler is installed), slf4j calls will be delegated to jul and jul records
* will be routed to SLF4J, resulting in an endless loop. We avoid this loop by detecting if slf4j-jdk14 is used
* in the client class path. If slf4j-jdk14 is found, we don't use the slf4j bridge.
*/
try {
Class.forName("org.slf4j.impl.JDK14LoggerFactory");
} catch (Exception ex) {
// Setup the bridge for java.util.logging to SLF4J
SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install();
}
}
/**
* Construct a new Pulsar Admin client object.
* <p>
* This client object can be used to perform many subsquent API calls
*
* @param serviceUrl
* the Pulsar service URL (eg. "http://my-broker.example.com:8080")
* @param pulsarConfig
* the ClientConfiguration object to be used to talk with Pulsar
*/
public PulsarAdmin(URL serviceUrl, ClientConfiguration pulsarConfig) throws PulsarClientException {
this.auth = pulsarConfig != null ? pulsarConfig.getAuthentication() : new AuthenticationDisabled();
LOG.debug("created: serviceUrl={}, authMethodName={}", serviceUrl,
auth != null ? auth.getAuthMethodName() : null);
if (auth != null) {
auth.start();
}
ClientConfig httpConfig = new ClientConfig();
httpConfig.property(ClientProperties.FOLLOW_REDIRECTS, true);
httpConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 8);
ClientBuilder clientBuilder = ClientBuilder.newBuilder().withConfig(httpConfig)
.register(JacksonConfigurator.class).register(JacksonFeature.class);
if (pulsarConfig != null && pulsarConfig.isUseTls()) {
try {
SSLContext sslCtx = null;
X509Certificate trustCertificates[] = SecurityUtility
.loadCertificatesFromPemFile(pulsarConfig.getTlsTrustCertsFilePath());
// Set private key and certificate if available
AuthenticationDataProvider authData = auth.getAuthData();
if (authData.hasDataForTls()) {
sslCtx = SecurityUtility.createSslContext(pulsarConfig.isTlsAllowInsecureConnection(),
trustCertificates, authData.getTlsCertificates(), authData.getTlsPrivateKey());
} else {
sslCtx = SecurityUtility.createSslContext(pulsarConfig.isTlsAllowInsecureConnection(),
trustCertificates);
}
clientBuilder.sslContext(sslCtx);
} catch (Exception e) {
try {
if (auth != null) {
auth.close();
}
} catch (IOException ioe) {
LOG.error("Failed to close the authentication service", ioe);
}
throw new PulsarClientException.InvalidConfigurationException(e.getMessage());
}
}
this.client = clientBuilder.build();
this.serviceUrl = serviceUrl;
WebTarget root = client.target(serviceUrl.toString());
web = root.path("/admin");
this.clusters = new ClustersImpl(web, auth);
this.brokers = new BrokersImpl(web, auth);
this.brokerStats = new BrokerStatsImpl(web, auth);
this.properties = new PropertiesImpl(web, auth);
this.namespaces = new NamespacesImpl(web, auth);
this.persistentTopics = new PersistentTopicsImpl(web, auth);
this.nonPersistentTopics = new NonPersistentTopicsImpl(web, auth);
this.resourceQuotas = new ResourceQuotasImpl(web, auth);
this.lookups = new LookupImpl(root, auth, pulsarConfig.isUseTls());
}
/**
* Construct a new Pulsar Admin client object.
* <p>
* This client object can be used to perform many subsquent API calls
*
* @param serviceUrl
* the Pulsar service URL (eg. "http://my-broker.example.com:8080")
* @param auth
* the Authentication object to be used to talk with Pulsar
*/
public PulsarAdmin(URL serviceUrl, Authentication auth) throws PulsarClientException {
this(serviceUrl, new ClientConfiguration() {
private static final long serialVersionUID = 1L;
{
setAuthentication(auth);
}
});
}
/**
* Construct a new Pulsar Admin client object.
* <p>
* This client object can be used to perform many subsquent API calls
*
* @param serviceUrl
* the Pulsar URL (eg. "http://my-broker.example.com:8080")
* @param authPluginClassName
* name of the Authentication-Plugin you want to use
* @param authParamsString
* string which represents parameters for the Authentication-Plugin, e.g., "key1:val1,key2:val2"
*/
public PulsarAdmin(URL serviceUrl, String authPluginClassName, String authParamsString) throws PulsarClientException {
this(serviceUrl, AuthenticationFactory.create(authPluginClassName, authParamsString));
}
/**
* Construct a new Pulsar Admin client object.
* <p>
* This client object can be used to perform many subsquent API calls
*
* @param serviceUrl
* the Pulsar URL (eg. "http://my-broker.example.com:8080")
* @param authPluginClassName
* name of the Authentication-Plugin you want to use
* @param authParams
* map which represents parameters for the Authentication-Plugin
*/
public PulsarAdmin(URL serviceUrl, String authPluginClassName, Map<String, String> authParams) throws PulsarClientException {
this(serviceUrl, AuthenticationFactory.create(authPluginClassName, authParams));
}
/**
* @return the clusters management object
*/
public Clusters clusters() {
return clusters;
}
/**
* @return the brokers management object
*/
public Brokers brokers() {
return brokers;
}
/**
* @return the properties management object
*/
public Properties properties() {
return properties;
}
/**
* @return the namespaces management object
*/
public Namespaces namespaces() {
return namespaces;
}
/**
* @return the persistentTopics management object
*/
public PersistentTopics persistentTopics() {
return persistentTopics;
}
/**
* @return the persistentTopics management object
*/
public NonPersistentTopics nonPersistentTopics() {
return nonPersistentTopics;
}
/**
* @return the resource quota management object
*/
public ResourceQuotas resourceQuotas() {
return resourceQuotas;
}
/**
* @return does a looks up for the broker serving the destination
*/
public Lookup lookups() {
return lookups;
}
/**
* @return the broker statics
*/
public BrokerStats brokerStats() {
return brokerStats;
}
/**
* @return the service URL that is being used
*/
public URL getServiceUrl() {
return serviceUrl;
}
/**
* Close the Pulsar admin client to release all the resources
*/
@Override
public void close() {
try {
if (auth != null) {
auth.close();
}
} catch (IOException e) {
LOG.error("Failed to close the authentication service", e);
}
client.close();
}
}