blob: ffbf7dc1a7386aded1b69a3c25bcc8a070d3bb68 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.tools;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import static net.sourceforge.argparse4j.impl.Arguments.store;
/**
* Primarily intended for use with system testing, this appender produces message
* to Kafka on each "append" request. For example, this helps with end-to-end tests
* of KafkaLog4jAppender.
*
* When used as a command-line tool, it appends increasing integers. It will produce a
* fixed number of messages unless the default max-messages -1 is used, in which case
* it appends indefinitely.
*/
public class VerifiableLog4jAppender {
Logger logger = Logger.getLogger(VerifiableLog4jAppender.class);
// If maxMessages < 0, log until the process is killed externally
private long maxMessages = -1;
// Hook to trigger logging thread to stop logging messages
private volatile boolean stopLogging = false;
/** Get the command-line argument parser. */
private static ArgumentParser argParser() {
ArgumentParser parser = ArgumentParsers
.newArgumentParser("verifiable-log4j-appender")
.defaultHelp(true)
.description("This tool produces increasing integers to the specified topic using KafkaLog4jAppender.");
parser.addArgument("--topic")
.action(store())
.required(true)
.type(String.class)
.metavar("TOPIC")
.help("Produce messages to this topic.");
parser.addArgument("--broker-list")
.action(store())
.required(true)
.type(String.class)
.metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
.dest("brokerList")
.help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
parser.addArgument("--max-messages")
.action(store())
.required(false)
.setDefault(-1)
.type(Integer.class)
.metavar("MAX-MESSAGES")
.dest("maxMessages")
.help("Produce this many messages. If -1, produce messages until the process is killed externally.");
parser.addArgument("--acks")
.action(store())
.required(false)
.setDefault("-1")
.type(String.class)
.choices("0", "1", "-1")
.metavar("ACKS")
.help("Acks required on each produced message. See Kafka docs on request.required.acks for details.");
parser.addArgument("--security-protocol")
.action(store())
.required(false)
.setDefault("PLAINTEXT")
.type(String.class)
.choices("PLAINTEXT", "SSL", "SASL_PLAINTEXT", "SASL_SSL")
.metavar("SECURITY-PROTOCOL")
.dest("securityProtocol")
.help("Security protocol to be used while communicating with Kafka brokers.");
parser.addArgument("--ssl-truststore-location")
.action(store())
.required(false)
.type(String.class)
.metavar("SSL-TRUSTSTORE-LOCATION")
.dest("sslTruststoreLocation")
.help("Location of SSL truststore to use.");
parser.addArgument("--ssl-truststore-password")
.action(store())
.required(false)
.type(String.class)
.metavar("SSL-TRUSTSTORE-PASSWORD")
.dest("sslTruststorePassword")
.help("Password for SSL truststore to use.");
parser.addArgument("--appender.config")
.action(store())
.required(false)
.type(String.class)
.metavar("CONFIG_FILE")
.help("Log4jAppender config properties file.");
parser.addArgument("--sasl-kerberos-service-name")
.action(store())
.required(false)
.type(String.class)
.metavar("SASL-KERBEROS-SERVICE-NAME")
.dest("saslKerberosServiceName")
.help("Name of sasl kerberos service.");
parser.addArgument("--client-jaas-conf-path")
.action(store())
.required(false)
.type(String.class)
.metavar("CLIENT-JAAS-CONF-PATH")
.dest("clientJaasConfPath")
.help("Path of JAAS config file of Kafka client.");
parser.addArgument("--kerb5-conf-path")
.action(store())
.required(false)
.type(String.class)
.metavar("KERB5-CONF-PATH")
.dest("kerb5ConfPath")
.help("Path of Kerb5 config file.");
return parser;
}
/**
* Read a properties file from the given path
* @param filename The path of the file to read
*
* Note: this duplication of org.apache.kafka.common.utils.Utils.loadProps is unfortunate
* but *intentional*. In order to use VerifiableProducer in compatibility and upgrade tests,
* we use VerifiableProducer from trunk tools package, and run it against 0.8.X.X kafka jars.
* Since this method is not in Utils in the 0.8.X.X jars, we have to cheat a bit and duplicate.
*/
public static Properties loadProps(String filename) throws IOException, FileNotFoundException {
Properties props = new Properties();
InputStream propStream = null;
try {
propStream = new FileInputStream(filename);
props.load(propStream);
} finally {
if (propStream != null)
propStream.close();
}
return props;
}
/** Construct a VerifiableLog4jAppender object from command-line arguments. */
public static VerifiableLog4jAppender createFromArgs(String[] args) {
ArgumentParser parser = argParser();
VerifiableLog4jAppender producer = null;
try {
Namespace res = parser.parseArgs(args);
int maxMessages = res.getInt("maxMessages");
String topic = res.getString("topic");
String configFile = res.getString("appender.config");
Properties props = new Properties();
props.setProperty("log4j.rootLogger", "INFO, KAFKA");
props.setProperty("log4j.appender.KAFKA", "org.apache.kafka.log4jappender.KafkaLog4jAppender");
props.setProperty("log4j.appender.KAFKA.layout", "org.apache.log4j.PatternLayout");
props.setProperty("log4j.appender.KAFKA.layout.ConversionPattern", "%-5p: %c - %m%n");
props.setProperty("log4j.appender.KAFKA.BrokerList", res.getString("brokerList"));
props.setProperty("log4j.appender.KAFKA.Topic", topic);
props.setProperty("log4j.appender.KAFKA.RequiredNumAcks", res.getString("acks"));
props.setProperty("log4j.appender.KAFKA.SyncSend", "true");
final String securityProtocol = res.getString("securityProtocol");
if (securityProtocol != null && !securityProtocol.equals(SecurityProtocol.PLAINTEXT.toString())) {
props.setProperty("log4j.appender.KAFKA.SecurityProtocol", securityProtocol);
}
if (securityProtocol != null && securityProtocol.contains("SSL")) {
props.setProperty("log4j.appender.KAFKA.SslTruststoreLocation", res.getString("sslTruststoreLocation"));
props.setProperty("log4j.appender.KAFKA.SslTruststorePassword", res.getString("sslTruststorePassword"));
}
if (securityProtocol != null && securityProtocol.contains("SASL")) {
props.setProperty("log4j.appender.KAFKA.SaslKerberosServiceName", res.getString("saslKerberosServiceName"));
props.setProperty("log4j.appender.KAFKA.clientJaasConfPath", res.getString("clientJaasConfPath"));
props.setProperty("log4j.appender.KAFKA.kerb5ConfPath", res.getString("kerb5ConfPath"));
}
props.setProperty("log4j.logger.kafka.log4j", "INFO, KAFKA");
if (configFile != null) {
try {
props.putAll(loadProps(configFile));
} catch (IOException e) {
throw new ArgumentParserException(e.getMessage(), parser);
}
}
producer = new VerifiableLog4jAppender(props, maxMessages);
} catch (ArgumentParserException e) {
if (args.length == 0) {
parser.printHelp();
System.exit(0);
} else {
parser.handleError(e);
System.exit(1);
}
}
return producer;
}
public VerifiableLog4jAppender(Properties props, int maxMessages) {
this.maxMessages = maxMessages;
PropertyConfigurator.configure(props);
}
public static void main(String[] args) throws IOException {
final VerifiableLog4jAppender appender = createFromArgs(args);
boolean infinite = appender.maxMessages < 0;
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
// Trigger main thread to stop producing messages
appender.stopLogging = true;
}
});
long maxMessages = infinite ? Long.MAX_VALUE : appender.maxMessages;
for (long i = 0; i < maxMessages; i++) {
if (appender.stopLogging) {
break;
}
appender.append(String.format("%d", i));
}
}
private void append(String msg) {
logger.info(msg);
}
}