blob: 04fd44fa58a0f1a92bd71d40b7a3a8c92014bcf2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.cli;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.io.FileInputStream;
import java.util.Arrays;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import com.beust.jcommander.DefaultUsageFormatter;
import com.beust.jcommander.IUsageFormatter;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
@Parameters(commandDescription = "Produce or consume messages on a specified topic")
public class PulsarClientTool {
@Parameter(names = { "--url" }, description = "Broker URL to which to connect.")
String serviceURL = null;
@Parameter(names = { "--proxy-url" }, description = "Proxy-server URL to which to connect.")
String proxyServiceURL = null;
@Parameter(names = { "--proxy-protocol" }, description = "Proxy protocol to select type of routing at proxy.")
ProxyProtocol proxyProtocol = null;
@Parameter(names = { "--auth-plugin" }, description = "Authentication plugin class name.")
String authPluginClassName = null;
@Parameter(names = { "--listener-name" }, description = "Listener name for the broker.")
String listenerName = null;
@Parameter(
names = { "--auth-params" },
description = "Authentication parameters, whose format is determined by the implementation " +
"of method `configure` in authentication plugin class, for example \"key1:val1,key2:val2\" " +
"or \"{\"key1\":\"val1\",\"key2\":\"val2\"}.")
String authParams = null;
@Parameter(names = { "-v", "--version" }, description = "Get version of pulsar client")
boolean version;
@Parameter(names = { "-h", "--help", }, help = true, description = "Show this help.")
boolean help;
boolean tlsAllowInsecureConnection = false;
boolean tlsEnableHostnameVerification = false;
String tlsTrustCertsFilePath = null;
// for tls with keystore type config
boolean useKeyStoreTls = false;
String tlsTrustStoreType = "JKS";
String tlsTrustStorePath = null;
String tlsTrustStorePassword = null;
JCommander commandParser;
IUsageFormatter usageFormatter;
CmdProduce produceCommand;
CmdConsume consumeCommand;
public PulsarClientTool(Properties properties) {
this.serviceURL = StringUtils.isNotBlank(properties.getProperty("brokerServiceUrl"))
? properties.getProperty("brokerServiceUrl") : properties.getProperty("webServiceUrl");
// fallback to previous-version serviceUrl property to maintain backward-compatibility
if (StringUtils.isBlank(this.serviceURL)) {
this.serviceURL = properties.getProperty("serviceUrl");
}
this.authPluginClassName = properties.getProperty("authPlugin");
this.authParams = properties.getProperty("authParams");
this.tlsAllowInsecureConnection = Boolean
.parseBoolean(properties.getProperty("tlsAllowInsecureConnection", "false"));
this.tlsEnableHostnameVerification = Boolean
.parseBoolean(properties.getProperty("tlsEnableHostnameVerification", "false"));
this.tlsTrustCertsFilePath = properties.getProperty("tlsTrustCertsFilePath");
this.useKeyStoreTls = Boolean
.parseBoolean(properties.getProperty("useKeyStoreTls", "false"));
this.tlsTrustStoreType = properties.getProperty("tlsTrustStoreType", "JKS");
this.tlsTrustStorePath = properties.getProperty("tlsTrustStorePath");
this.tlsTrustStorePassword = properties.getProperty("tlsTrustStorePassword");
produceCommand = new CmdProduce();
consumeCommand = new CmdConsume();
this.commandParser = new JCommander();
this.usageFormatter = new DefaultUsageFormatter(this.commandParser);
commandParser.setProgramName("pulsar-client");
commandParser.addObject(this);
commandParser.addCommand("produce", produceCommand);
commandParser.addCommand("consume", consumeCommand);
}
private void updateConfig() throws UnsupportedAuthenticationException {
ClientBuilder clientBuilder = PulsarClient.builder();
Authentication authentication = null;
if (isNotBlank(this.authPluginClassName)) {
authentication = AuthenticationFactory.create(authPluginClassName, authParams);
clientBuilder.authentication(authentication);
}
if (isNotBlank(this.listenerName)) {
clientBuilder.listenerName(this.listenerName);
}
clientBuilder.allowTlsInsecureConnection(this.tlsAllowInsecureConnection);
clientBuilder.tlsTrustCertsFilePath(this.tlsTrustCertsFilePath);
clientBuilder.enableTlsHostnameVerification(this.tlsEnableHostnameVerification);
clientBuilder.serviceUrl(serviceURL);
clientBuilder.useKeyStoreTls(useKeyStoreTls)
.tlsTrustStoreType(tlsTrustStoreType)
.tlsTrustStorePath(tlsTrustStorePath)
.tlsTrustStorePassword(tlsTrustStorePassword);
if (StringUtils.isNotBlank(proxyServiceURL)) {
if (proxyProtocol == null) {
System.out.println("proxy-protocol must be provided with proxy-url");
System.exit(-1);
}
clientBuilder.proxyServiceUrl(proxyServiceURL, proxyProtocol);
}
this.produceCommand.updateConfig(clientBuilder, authentication, this.serviceURL);
this.consumeCommand.updateConfig(clientBuilder, authentication, this.serviceURL);
}
public int run(String[] args) {
try {
commandParser.parse(args);
if (isBlank(this.serviceURL)) {
commandParser.usage();
return -1;
}
if (version) {
System.out.println("Current version of pulsar client is: " + PulsarVersion.getVersion());
return 0;
}
if (help) {
commandParser.usage();
return 0;
}
try {
this.updateConfig(); // If the --url, --auth-plugin, or --auth-params parameter are not specified,
// it will default to the values passed in by the constructor
} catch (UnsupportedAuthenticationException exp) {
System.out.println("Failed to load an authentication plugin");
exp.printStackTrace();
return -1;
}
String chosenCommand = commandParser.getParsedCommand();
if ("produce".equals(chosenCommand)) {
return produceCommand.run();
} else if ("consume".equals(chosenCommand)) {
return consumeCommand.run();
} else {
commandParser.usage();
return -1;
}
} catch (Exception e) {
System.out.println(e.getMessage());
String chosenCommand = commandParser.getParsedCommand();
if (e instanceof ParameterException) {
usageFormatter.usage(chosenCommand);
} else {
e.printStackTrace();
}
return -1;
}
}
public static void main(String[] args) throws Exception {
if (args.length == 0) {
System.out.println("Usage: pulsar-client CONF_FILE_PATH [options] [command] [command options]");
System.exit(-1);
}
String configFile = args[0];
Properties properties = new Properties();
if (configFile != null) {
try (FileInputStream fis = new FileInputStream(configFile)) {
properties.load(fis);
}
}
PulsarClientTool clientTool = new PulsarClientTool(properties);
int exit_code = clientTool.run(Arrays.copyOfRange(args, 1, args.length));
System.exit(exit_code);
}
}