blob: 0c82da726b20fe20f51a40bfad00bea929447ede [file] [log] [blame]
/*
* Copyright 2017 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.kafka.spout;
import static org.apache.storm.kafka.spout.KafkaSpoutTopologyMainNamedTopics.TOPIC_0;
import static org.apache.storm.kafka.spout.KafkaSpoutTopologyMainNamedTopics.TOPIC_1;
import static org.apache.storm.kafka.spout.KafkaSpoutTopologyMainNamedTopics.TOPIC_2;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.kafka.bolt.KafkaProducerTopology;
public class KafkaSpoutTopologyMainNamedTopicsLocal {
public static void main(String[] args) throws Exception {
new KafkaSpoutTopologyMainNamedTopicsLocal().runExample();
}
protected void runExample() throws Exception {
String brokerUrl = "localhost:9092";
KafkaSpoutTopologyMainNamedTopics example = getTopology();
Config tpConf = example.getConfig();
LocalCluster localCluster = new LocalCluster();
// Producers. This is just to get some data in Kafka, normally you would be getting this data from elsewhere
localCluster.submitTopology(TOPIC_0 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, TOPIC_0));
localCluster.submitTopology(TOPIC_1 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, TOPIC_1));
localCluster.submitTopology(TOPIC_2 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, TOPIC_2));
//Consumer. Sets up a topology that reads the given Kafka spouts and logs the received messages
localCluster.submitTopology("storm-kafka-client-spout-test", tpConf, example.getTopologyKafkaSpout(example.getKafkaSpoutConfig(brokerUrl)));
stopWaitingForInput();
}
protected KafkaSpoutTopologyMainNamedTopics getTopology() {
return new KafkaSpoutTopologyMainNamedTopics();
}
protected void stopWaitingForInput() {
try {
System.out.println("PRESS ENTER TO STOP");
new BufferedReader(new InputStreamReader(System.in)).readLine();
System.exit(0);
} catch (IOException e) {
e.printStackTrace();
}
}
}