| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version |
| * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions |
| * and limitations under the License. |
| */ |
| |
| package org.apache.storm.starter; |
| |
| import java.io.BufferedReader; |
| import java.io.BufferedWriter; |
| import java.io.File; |
| import java.io.FileReader; |
| import java.io.FileWriter; |
| import java.io.IOException; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.StringTokenizer; |
| import org.apache.storm.Config; |
| import org.apache.storm.StormSubmitter; |
| import org.apache.storm.blobstore.AtomicOutputStream; |
| import org.apache.storm.blobstore.BlobStoreAclHandler; |
| import org.apache.storm.blobstore.ClientBlobStore; |
| import org.apache.storm.generated.AccessControl; |
| import org.apache.storm.generated.AlreadyAliveException; |
| import org.apache.storm.generated.AuthorizationException; |
| import org.apache.storm.generated.InvalidTopologyException; |
| import org.apache.storm.generated.KeyAlreadyExistsException; |
| import org.apache.storm.generated.KeyNotFoundException; |
| import org.apache.storm.generated.SettableBlobMeta; |
| import org.apache.storm.spout.SpoutOutputCollector; |
| import org.apache.storm.task.ShellBolt; |
| import org.apache.storm.task.TopologyContext; |
| import org.apache.storm.topology.BasicOutputCollector; |
| import org.apache.storm.topology.IRichBolt; |
| import org.apache.storm.topology.OutputFieldsDeclarer; |
| import org.apache.storm.topology.TopologyBuilder; |
| import org.apache.storm.topology.base.BaseBasicBolt; |
| import org.apache.storm.topology.base.BaseRichSpout; |
| import org.apache.storm.tuple.Fields; |
| import org.apache.storm.tuple.Tuple; |
| import org.apache.storm.tuple.Values; |
| import org.apache.storm.utils.Utils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| @SuppressWarnings("checkstyle:AbbreviationAsWordInName") |
| public class BlobStoreAPIWordCountTopology { |
| private static final Logger LOG = LoggerFactory.getLogger(BlobStoreAPIWordCountTopology.class); |
| private static ClientBlobStore store; // Client API to invoke blob store API functionality |
| private static String key = "key"; |
| private static String fileName = "blacklist.txt"; |
| |
| public static void prepare() { |
| Config conf = new Config(); |
| conf.putAll(Utils.readStormConfig()); |
| store = Utils.getClientBlobStore(conf); |
| } |
| |
| // Equivalent create command on command line |
| // storm blobstore create --file blacklist.txt --acl o::rwa key |
| private static void createBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, File file) |
| throws AuthorizationException, KeyAlreadyExistsException, IOException, KeyNotFoundException { |
| String stringBlobAcl = "o::rwa"; |
| AccessControl blobAcl = BlobStoreAclHandler.parseAccessControl(stringBlobAcl); |
| List<AccessControl> acls = new LinkedList<AccessControl>(); |
| acls.add(blobAcl); // more ACLs can be added here |
| SettableBlobMeta settableBlobMeta = new SettableBlobMeta(acls); |
| AtomicOutputStream blobStream = clientBlobStore.createBlob(blobKey, settableBlobMeta); |
| blobStream.write(readFile(file).toString().getBytes()); |
| blobStream.close(); |
| } |
| |
| // Equivalent update command on command line |
| // storm blobstore update --file blacklist.txt key |
| private static void updateBlobWithContent(String blobKey, ClientBlobStore clientBlobStore, File file) |
| throws KeyNotFoundException, AuthorizationException, IOException { |
| AtomicOutputStream blobOutputStream = clientBlobStore.updateBlob(blobKey); |
| blobOutputStream.write(readFile(file).toString().getBytes()); |
| blobOutputStream.close(); |
| } |
| |
| private static String getRandomSentence() { |
| String[] sentences = new String[]{ |
| "the cow jumped over the moon", "an apple a day keeps the doctor away", |
| "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" |
| }; |
| String sentence = sentences[new Random().nextInt(sentences.length)]; |
| return sentence; |
| } |
| |
| private static Set<String> getRandomWordSet() { |
| Set<String> randomWordSet = new HashSet<>(); |
| Random random = new Random(); |
| String[] words = new String[]{ |
| "cow", "jumped", "over", "the", "moon", "apple", "day", "doctor", "away", |
| "four", "seven", "ago", "snow", "white", "seven", "dwarfs", "nature", "two" |
| }; |
| // Choosing atmost 5 words to update the blacklist file for filtering |
| for (int i = 0; i < 5; i++) { |
| randomWordSet.add(words[random.nextInt(words.length)]); |
| } |
| return randomWordSet; |
| } |
| |
| private static Set<String> parseFile(String fileName) throws IOException { |
| File file = new File(fileName); |
| Set<String> wordSet = new HashSet<>(); |
| if (!file.exists()) { |
| return wordSet; |
| } |
| StringTokenizer tokens = new StringTokenizer(readFile(file).toString(), "\r\n"); |
| while (tokens.hasMoreElements()) { |
| wordSet.add(tokens.nextToken()); |
| } |
| LOG.debug("parseFile {}", wordSet); |
| return wordSet; |
| } |
| |
| private static StringBuilder readFile(File file) throws IOException { |
| String line; |
| StringBuilder fileContent = new StringBuilder(); |
| // Do not use canonical file name here as we are using |
| // symbolic links to read file data and performing atomic move |
| // while updating files |
| BufferedReader br = new BufferedReader(new FileReader(file)); |
| while ((line = br.readLine()) != null) { |
| fileContent.append(line); |
| fileContent.append(System.lineSeparator()); |
| } |
| return fileContent; |
| } |
| |
| // Creating a blacklist file to read from the disk |
| public static File createFile(String fileName) throws IOException { |
| File file = null; |
| file = new File(fileName); |
| if (!file.exists()) { |
| file.createNewFile(); |
| } |
| writeToFile(file, getRandomWordSet()); |
| return file; |
| } |
| |
| // Updating a blacklist file periodically with random words |
| public static File updateFile(File file) throws IOException { |
| writeToFile(file, getRandomWordSet()); |
| return file; |
| } |
| |
| // Writing random words to be blacklisted |
| public static void writeToFile(File file, Set<String> content) throws IOException { |
| FileWriter fw = new FileWriter(file, false); |
| BufferedWriter bw = new BufferedWriter(fw); |
| Iterator<String> iter = content.iterator(); |
| while (iter.hasNext()) { |
| bw.write(iter.next()); |
| bw.write(System.lineSeparator()); |
| } |
| bw.close(); |
| } |
| |
| public static void main(String[] args) { |
| prepare(); |
| BlobStoreAPIWordCountTopology wc = new BlobStoreAPIWordCountTopology(); |
| try { |
| File file = createFile(fileName); |
| // Creating blob again before launching topology |
| createBlobWithContent(key, store, file); |
| |
| // Blostore launch command with topology blobstore map |
| // Here we are giving it a local name so that we can read from the file |
| // bin/storm jar examples/storm-starter/storm-starter-topologies-0.11.0-SNAPSHOT.jar |
| // org.apache.storm.starter.BlobStoreAPIWordCountTopology bl -c |
| // topology.blobstore.map='{"key":{"localname":"blacklist.txt", "uncompress":"false"}}' |
| wc.buildAndLaunchWordCountTopology(args); |
| |
| // Updating file few times every 5 seconds |
| for (int i = 0; i < 10; i++) { |
| updateBlobWithContent(key, store, updateFile(file)); |
| Utils.sleep(5000); |
| } |
| } catch (KeyAlreadyExistsException kae) { |
| LOG.info("Key already exists {}", kae); |
| } catch (AuthorizationException | KeyNotFoundException | IOException exp) { |
| throw new RuntimeException(exp); |
| } |
| } |
| |
| public void buildAndLaunchWordCountTopology(String[] args) { |
| TopologyBuilder builder = new TopologyBuilder(); |
| builder.setSpout("spout", new RandomSentenceSpout(), 5); |
| builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); |
| builder.setBolt("filter", new FilterWords(), 6).shuffleGrouping("split"); |
| |
| Config conf = new Config(); |
| conf.setDebug(true); |
| try { |
| conf.setNumWorkers(3); |
| StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); |
| } catch (InvalidTopologyException | AuthorizationException | AlreadyAliveException exp) { |
| throw new RuntimeException(exp); |
| } |
| } |
| |
| // Spout implementation |
| public static class RandomSentenceSpout extends BaseRichSpout { |
| SpoutOutputCollector collector; |
| |
| @Override |
| public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) { |
| this.collector = collector; |
| } |
| |
| @Override |
| public void nextTuple() { |
| Utils.sleep(100); |
| collector.emit(new Values(getRandomSentence())); |
| } |
| |
| @Override |
| public void ack(Object id) { |
| } |
| |
| @Override |
| public void fail(Object id) { |
| } |
| |
| @Override |
| public void declareOutputFields(OutputFieldsDeclarer declarer) { |
| declarer.declare(new Fields("sentence")); |
| } |
| |
| } |
| |
| // Bolt implementation |
| public static class SplitSentence extends ShellBolt implements IRichBolt { |
| |
| public SplitSentence() { |
| super("python", "splitsentence.py"); |
| } |
| |
| @Override |
| public void declareOutputFields(OutputFieldsDeclarer declarer) { |
| declarer.declare(new Fields("word")); |
| } |
| |
| @Override |
| public Map<String, Object> getComponentConfiguration() { |
| return null; |
| } |
| } |
| |
| public static class FilterWords extends BaseBasicBolt { |
| boolean poll = false; |
| long pollTime; |
| Set<String> wordSet; |
| |
| @Override |
| public void execute(Tuple tuple, BasicOutputCollector collector) { |
| String word = tuple.getString(0); |
| // Thread Polling every 5 seconds to update the wordSet seconds which is |
| // used in FilterWords bolt to filter the words |
| try { |
| if (!poll) { |
| wordSet = parseFile(fileName); |
| pollTime = System.currentTimeMillis(); |
| poll = true; |
| } else { |
| if ((System.currentTimeMillis() - pollTime) > 5000) { |
| wordSet = parseFile(fileName); |
| pollTime = System.currentTimeMillis(); |
| } |
| } |
| } catch (IOException exp) { |
| throw new RuntimeException(exp); |
| } |
| if (wordSet != null && !wordSet.contains(word)) { |
| collector.emit(new Values(word)); |
| } |
| } |
| |
| @Override |
| public void declareOutputFields(OutputFieldsDeclarer declarer) { |
| declarer.declare(new Fields("word")); |
| } |
| } |
| } |
| |
| |