blob: 7d5a530142d1b91dc145eb8588b389bf63adaea7 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.edgent.samples.connectors.kafka;
import org.apache.edgent.samples.connectors.Options;
/**
* Demonstrate integrating with the Apache Kafka messaging system
* <a href="http://kafka.apache.org">http://kafka.apache.org</a>.
* <p>
* {@link org.apache.edgent.connectors.kafka.KafkaProducer KafkaProducer} is
* a connector used to create a bridge between topology streams
* and publishing to Kafka topics.
* <p>
* {@link org.apache.edgent.connectors.kafka.KafkaConsumer KafkaConsumer} is
* a connector used to create a bridge between topology streams
* and subscribing to Kafka topics.
* <p>
* The client either publishes messages to a topic or
* subscribes to the topic and reports the messages received.
* <p>
* By default, a running Kafka cluster with the following
* characteristics is assumed:
* <ul>
* <li>{@code bootstrap.servers="localhost:9092"}</li>
* <li>{@code zookeeper.connect="localhost:2181"}</li>
* <li>kafka topic {@code "kafkaSampleTopic"} exists</li>
* </ul>
* <p>
* See the Apache Kafka link above for information about setting up a Kafka
* cluster as well as creating a topic.
* <p>
* This may be executed from as:
* <UL>
* <LI>
* {@code java -cp samples/lib/org.apache.edgent.samples.connectors.kafka.jar
* org.apache.edgent.samples.connectors.kafka.KafkaClient -h
* } - Run directly from the command line.
* </LI>
* </UL>
* <UL>
* <LI>
* An application execution within your IDE once you set the class path to include the correct jars.</LI>
* </UL>
*/
public class KafkaClient {
private static final String usage = "usage: "
+ "\n" + "[-v] [-h]"
+ "\n" + "pub | sub"
+ "\n" + "[bootstrap.servers=<value>]"
+ "\n" + "[zookeeper.connect=<value>]"
+ "\n" + "[group.id=<value>]"
+ "\n" + "[pubcnt=<value>]"
;
public static void main(String[] args) throws Exception {
Options options = processArgs(args);
if (options == null)
return;
Runner.run(options);
}
private static Options processArgs(String[] args) {
Options options = new Options();
initHandlers(options);
try {
options.processArgs(args);
}
catch (Exception e) {
System.err.println(e);
System.out.println(usage);
return null;
}
if ((Boolean)options.get(OPT_HELP)) {
System.out.println(usage);
return null;
}
if (!(Boolean)options.get(OPT_PUB) && !(Boolean)options.get(OPT_SUB)) {
System.err.println(String.format("Missing argument '%s' or '%s'.", OPT_PUB, OPT_SUB));
System.out.println(usage);
return null;
}
String[] announceOpts = new String[] {
};
if ((Boolean)options.get(OPT_VERBOSE))
announceOpts = options.getAll().stream().map(e -> e.getKey()).toArray(String[]::new);
for (String opt : announceOpts) {
Object value = options.get(opt);
if (value != null) {
if (opt.toLowerCase().contains("password"))
value = "*****";
System.out.println("Using "+opt+"="+value);
}
}
return options;
}
static final String OPT_VERBOSE = "-v";
static final String OPT_HELP = "-h";
static final String OPT_PUB = "pub";
static final String OPT_SUB = "sub";
static final String OPT_BOOTSTRAP_SERVERS = "bootstrap.servers";
static final String OPT_ZOOKEEPER_CONNECT = "zookeeper.connect";
static final String OPT_GROUP_ID = "group.id";
static final String OPT_TOPIC = "topic";
static final String OPT_PUB_CNT = "pubcnt";
private static void initHandlers(Options opts) {
// options for which we have a default
opts.addHandler(OPT_HELP, null, false);
opts.addHandler(OPT_VERBOSE, null, false);
opts.addHandler(OPT_PUB, null, false);
opts.addHandler(OPT_SUB, null, false);
opts.addHandler(OPT_BOOTSTRAP_SERVERS, v -> v, "localhost:9092");
opts.addHandler(OPT_ZOOKEEPER_CONNECT, v -> v, "localhost:2181");
opts.addHandler(OPT_TOPIC, v -> v, "kafkaSampleTopic");
opts.addHandler(OPT_PUB_CNT, v -> Integer.valueOf(v), -1);
// optional options (no default value)
opts.addHandler(OPT_GROUP_ID, v -> v);
}
}