blob: 7677045f0899b5c5f59e8355d9e00c7247be1fed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
import java.net.InetSocketAddress;
import java.time.Clock;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.api.ServiceUrlProvider;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
public class ClientBuilderImpl implements ClientBuilder {
ClientConfigurationData conf;
public ClientBuilderImpl() {
this(new ClientConfigurationData());
}
public ClientBuilderImpl(ClientConfigurationData conf) {
this.conf = conf;
}
@Override
public PulsarClient build() throws PulsarClientException {
checkArgument(StringUtils.isNotBlank(conf.getServiceUrl()) || conf.getServiceUrlProvider() != null,
"service URL or service URL provider needs to be specified on the ClientBuilder object.");
checkArgument(StringUtils.isBlank(conf.getServiceUrl()) || conf.getServiceUrlProvider() == null,
"Can only chose one way service URL or service URL provider.");
if (conf.getServiceUrlProvider() != null) {
checkArgument(StringUtils.isNotBlank(conf.getServiceUrlProvider().getServiceUrl()),
"Cannot get service url from service url provider.");
conf.setServiceUrl(conf.getServiceUrlProvider().getServiceUrl());
}
PulsarClient client = new PulsarClientImpl(conf);
if (conf.getServiceUrlProvider() != null) {
conf.getServiceUrlProvider().initialize(client);
}
return client;
}
@Override
public ClientBuilder clone() {
return new ClientBuilderImpl(conf.clone());
}
@Override
public ClientBuilder loadConf(Map<String, Object> config) {
conf = ConfigurationDataUtils.loadData(config, conf, ClientConfigurationData.class);
setAuthenticationFromPropsIfAvailable(conf);
return this;
}
@Override
public ClientBuilder serviceUrl(String serviceUrl) {
checkArgument(StringUtils.isNotBlank(serviceUrl), "Param serviceUrl must not be blank.");
conf.setServiceUrl(serviceUrl);
if (!conf.isUseTls()) {
enableTls(serviceUrl.startsWith("pulsar+ssl") || serviceUrl.startsWith("https"));
}
return this;
}
@Override
public ClientBuilder serviceUrlProvider(ServiceUrlProvider serviceUrlProvider) {
checkArgument(serviceUrlProvider != null, "Param serviceUrlProvider must not be null.");
conf.setServiceUrlProvider(serviceUrlProvider);
return this;
}
@Override
public ClientBuilder listenerName(String listenerName) {
checkArgument(StringUtils.isNotBlank(listenerName), "Param listenerName must not be blank.");
conf.setListenerName(StringUtils.trim(listenerName));
return this;
}
@Override
public ClientBuilder connectionMaxIdleSeconds(int connectionMaxIdleSeconds) {
checkArgument(connectionMaxIdleSeconds < 0
|| connectionMaxIdleSeconds >= ConnectionPool.IDLE_DETECTION_INTERVAL_SECONDS_MIN,
"Connection idle detect interval seconds at least "
+ ConnectionPool.IDLE_DETECTION_INTERVAL_SECONDS_MIN + ".");
conf.setConnectionMaxIdleSeconds(connectionMaxIdleSeconds);
return this;
}
@Override
public ClientBuilder authentication(Authentication authentication) {
conf.setAuthentication(authentication);
return this;
}
@Override
public ClientBuilder authentication(String authPluginClassName, String authParamsString)
throws UnsupportedAuthenticationException {
conf.setAuthPluginClassName(authPluginClassName);
conf.setAuthParams(authParamsString);
conf.setAuthParamMap(null);
conf.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParamsString));
return this;
}
@Override
public ClientBuilder authentication(String authPluginClassName, Map<String, String> authParams)
throws UnsupportedAuthenticationException {
conf.setAuthPluginClassName(authPluginClassName);
conf.setAuthParamMap(authParams);
conf.setAuthParams(null);
conf.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParams));
return this;
}
private void setAuthenticationFromPropsIfAvailable(ClientConfigurationData clientConfig) {
String authPluginClass = clientConfig.getAuthPluginClassName();
String authParams = clientConfig.getAuthParams();
Map<String, String> authParamMap = clientConfig.getAuthParamMap();
if (StringUtils.isBlank(authPluginClass) || (StringUtils.isBlank(authParams) && authParamMap == null)) {
return;
}
try {
if (StringUtils.isNotBlank(authParams)) {
authentication(authPluginClass, authParams);
} else if (authParamMap != null) {
authentication(authPluginClass, authParamMap);
}
} catch (UnsupportedAuthenticationException ex) {
throw new RuntimeException("Failed to create authentication: " + ex.getMessage(), ex);
}
}
@Override
public ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit) {
checkArgument(operationTimeout >= 0, "operationTimeout needs to be >= 0");
conf.setOperationTimeoutMs(unit.toMillis(operationTimeout));
return this;
}
@Override
public ClientBuilder lookupTimeout(int lookupTimeout, TimeUnit unit) {
conf.setLookupTimeoutMs(unit.toMillis(lookupTimeout));
return this;
}
@Override
public ClientBuilder ioThreads(int numIoThreads) {
checkArgument(numIoThreads > 0, "ioThreads needs to be > 0");
conf.setNumIoThreads(numIoThreads);
return this;
}
@Override
public ClientBuilder listenerThreads(int numListenerThreads) {
checkArgument(numListenerThreads > 0, "listenerThreads needs to be > 0");
conf.setNumListenerThreads(numListenerThreads);
return this;
}
@Override
public ClientBuilder connectionsPerBroker(int connectionsPerBroker) {
checkArgument(connectionsPerBroker >= 0, "connectionsPerBroker needs to be >= 0");
conf.setConnectionsPerBroker(connectionsPerBroker);
return this;
}
@Override
public ClientBuilder enableTcpNoDelay(boolean useTcpNoDelay) {
conf.setUseTcpNoDelay(useTcpNoDelay);
return this;
}
@Override
public ClientBuilder enableTls(boolean useTls) {
conf.setUseTls(useTls);
return this;
}
@Override
public ClientBuilder tlsKeyFilePath(String tlsKeyFilePath) {
conf.setTlsKeyFilePath(tlsKeyFilePath);
return this;
}
@Override
public ClientBuilder tlsCertificateFilePath(String tlsCertificateFilePath) {
conf.setTlsCertificateFilePath(tlsCertificateFilePath);
return this;
}
@Override
public ClientBuilder enableTlsHostnameVerification(boolean enableTlsHostnameVerification) {
conf.setTlsHostnameVerificationEnable(enableTlsHostnameVerification);
return this;
}
@Override
public ClientBuilder tlsTrustCertsFilePath(String tlsTrustCertsFilePath) {
conf.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
return this;
}
@Override
public ClientBuilder allowTlsInsecureConnection(boolean tlsAllowInsecureConnection) {
conf.setTlsAllowInsecureConnection(tlsAllowInsecureConnection);
return this;
}
@Override
public ClientBuilder useKeyStoreTls(boolean useKeyStoreTls) {
conf.setUseKeyStoreTls(useKeyStoreTls);
return this;
}
@Override
public ClientBuilder sslProvider(String sslProvider) {
conf.setSslProvider(sslProvider);
return this;
}
@Override
public ClientBuilder tlsKeyStoreType(String tlsKeyStoreType) {
conf.setTlsKeyStoreType(tlsKeyStoreType);
return this;
}
@Override
public ClientBuilder tlsKeyStorePath(String tlsTrustStorePath) {
conf.setTlsKeyStorePath(tlsTrustStorePath);
return this;
}
@Override
public ClientBuilder tlsKeyStorePassword(String tlsKeyStorePassword) {
conf.setTlsKeyStorePassword(tlsKeyStorePassword);
return this;
}
@Override
public ClientBuilder tlsTrustStoreType(String tlsTrustStoreType) {
conf.setTlsTrustStoreType(tlsTrustStoreType);
return this;
}
@Override
public ClientBuilder tlsTrustStorePath(String tlsTrustStorePath) {
conf.setTlsTrustStorePath(tlsTrustStorePath);
return this;
}
@Override
public ClientBuilder tlsTrustStorePassword(String tlsTrustStorePassword) {
conf.setTlsTrustStorePassword(tlsTrustStorePassword);
return this;
}
@Override
public ClientBuilder tlsCiphers(Set<String> tlsCiphers) {
conf.setTlsCiphers(tlsCiphers);
return this;
}
@Override
public ClientBuilder tlsProtocols(Set<String> tlsProtocols) {
conf.setTlsProtocols(tlsProtocols);
return this;
}
@Override
public ClientBuilder statsInterval(long statsInterval, TimeUnit unit) {
conf.setStatsIntervalSeconds(unit.toSeconds(statsInterval));
return this;
}
@Override
public ClientBuilder maxConcurrentLookupRequests(int concurrentLookupRequests) {
conf.setConcurrentLookupRequest(concurrentLookupRequests);
return this;
}
@Override
public ClientBuilder maxLookupRequests(int maxLookupRequests) {
conf.setMaxLookupRequest(maxLookupRequests);
return this;
}
@Override
public ClientBuilder maxLookupRedirects(int maxLookupRedirects) {
conf.setMaxLookupRedirects(maxLookupRedirects);
return this;
}
@Override
public ClientBuilder maxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRequestPerConnection) {
conf.setMaxNumberOfRejectedRequestPerConnection(maxNumberOfRejectedRequestPerConnection);
return this;
}
@Override
public ClientBuilder keepAliveInterval(int keepAliveInterval, TimeUnit unit) {
conf.setKeepAliveIntervalSeconds((int) unit.toSeconds(keepAliveInterval));
return this;
}
@Override
public ClientBuilder connectionTimeout(int duration, TimeUnit unit) {
conf.setConnectionTimeoutMs((int) unit.toMillis(duration));
return this;
}
@Override
public ClientBuilder startingBackoffInterval(long duration, TimeUnit unit) {
conf.setInitialBackoffIntervalNanos(unit.toNanos(duration));
return this;
}
@Override
public ClientBuilder maxBackoffInterval(long duration, TimeUnit unit) {
conf.setMaxBackoffIntervalNanos(unit.toNanos(duration));
return this;
}
@Override
public ClientBuilder enableBusyWait(boolean enableBusyWait) {
conf.setEnableBusyWait(enableBusyWait);
return this;
}
public ClientConfigurationData getClientConfigurationData() {
return conf;
}
@Override
public ClientBuilder memoryLimit(long memoryLimit, SizeUnit unit) {
conf.setMemoryLimitBytes(unit.toBytes(memoryLimit));
return this;
}
@Override
public ClientBuilder clock(Clock clock) {
conf.setClock(clock);
return this;
}
@Override
public ClientBuilder proxyServiceUrl(String proxyServiceUrl, ProxyProtocol proxyProtocol) {
if (StringUtils.isNotBlank(proxyServiceUrl)) {
checkArgument(proxyProtocol != null, "proxyProtocol must be present with proxyServiceUrl");
}
conf.setProxyServiceUrl(proxyServiceUrl);
conf.setProxyProtocol(proxyProtocol);
return this;
}
@Override
public ClientBuilder enableTransaction(boolean enableTransaction) {
conf.setEnableTransaction(enableTransaction);
return this;
}
@Override
public ClientBuilder dnsLookupBind(String address, int port) {
checkArgument(port >= 0 && port <= 65535, "DnsLookBindPort need to be within the range of 0 and 65535");
conf.setDnsLookupBindAddress(address);
conf.setDnsLookupBindPort(port);
return this;
}
@Override
public ClientBuilder socks5ProxyAddress(InetSocketAddress socks5ProxyAddress) {
conf.setSocks5ProxyAddress(socks5ProxyAddress);
return this;
}
@Override
public ClientBuilder socks5ProxyUsername(String socks5ProxyUsername) {
conf.setSocks5ProxyUsername(socks5ProxyUsername);
return this;
}
@Override
public ClientBuilder socks5ProxyPassword(String socks5ProxyPassword) {
conf.setSocks5ProxyPassword(socks5ProxyPassword);
return this;
}
/**
* Set the description.
*
* <p> By default, when the client connects to the broker, a version string like "Pulsar-Java-v<x.y.z>" will be
* carried and saved by the broker. The client version string could be queried from the topic stats.
*
* <p> This method provides a way to add more description to a specific PulsarClient instance. If it's configured,
* the description will be appended to the original client version string, with '-' as the separator.
*
* <p>For example, if the client version is 3.0.0, and the description is "forked", the final client version string
* will be "Pulsar-Java-v3.0.0-forked".
*
* @param description the description of the current PulsarClient instance
* @throws IllegalArgumentException if the length of description exceeds 64
*/
public ClientBuilder description(String description) {
if (description != null && description.length() > 64) {
throw new IllegalArgumentException("description should be at most 64 characters");
}
conf.setDescription(description);
return this;
}
}