| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package storm.kafka; |
| |
| import backtype.storm.Config; |
| import backtype.storm.LocalCluster; |
| import backtype.storm.generated.StormTopology; |
| import backtype.storm.tuple.Fields; |
| import backtype.storm.tuple.Values; |
| import com.google.common.collect.ImmutableMap; |
| import storm.kafka.trident.TridentKafkaStateFactory; |
| import storm.kafka.trident.TridentKafkaUpdater; |
| import storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper; |
| import storm.kafka.trident.selector.DefaultTopicSelector; |
| import storm.trident.Stream; |
| import storm.trident.TridentTopology; |
| import storm.trident.testing.FixedBatchSpout; |
| |
| import java.util.Properties; |
| |
| public class TridentKafkaTopology { |
| |
| private static StormTopology buildTopology(String brokerConnectionString) { |
| Fields fields = new Fields("word", "count"); |
| FixedBatchSpout spout = new FixedBatchSpout(fields, 4, |
| new Values("storm", "1"), |
| new Values("trident", "1"), |
| new Values("needs", "1"), |
| new Values("javadoc", "1") |
| ); |
| spout.setCycle(true); |
| |
| TridentTopology topology = new TridentTopology(); |
| Stream stream = topology.newStream("spout1", spout); |
| |
| Properties props = new Properties(); |
| props.put("bootstrap.servers", brokerConnectionString); |
| props.put("acks", "1"); |
| props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); |
| props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); |
| |
| TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory() |
| .withProducerProperties(props) |
| .withKafkaTopicSelector(new DefaultTopicSelector("test")) |
| .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count")); |
| stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields()); |
| |
| return topology.build(); |
| } |
| |
| /** |
| * To run this topology ensure you have a kafka broker running and provide connection string to broker as argument. |
| * Create a topic test with command line, |
| * kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic test |
| * |
| * run this program and run the kafka consumer: |
| * kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning |
| * |
| * you should see the messages flowing through. |
| * |
| * @param args |
| * @throws Exception |
| */ |
| public static void main(String[] args) throws Exception { |
| if(args.length < 1) { |
| System.out.println("Please provide kafka broker url ,e.g. localhost:9092"); |
| } |
| |
| LocalCluster cluster = new LocalCluster(); |
| cluster.submitTopology("wordCounter", new Config(), buildTopology(args[0])); |
| Thread.sleep(60 * 1000); |
| cluster.killTopology("wordCounter"); |
| |
| cluster.shutdown(); |
| } |
| } |