| ;; Licensed to the Apache Software Foundation (ASF) under one |
| ;; or more contributor license agreements. See the NOTICE file |
| ;; distributed with this work for additional information |
| ;; regarding copyright ownership. The ASF licenses this file |
| ;; to you under the Apache License, Version 2.0 (the |
| ;; "License"); you may not use this file except in compliance |
| ;; with the License. You may obtain a copy of the License at |
| ;; |
| ;; http://www.apache.org/licenses/LICENSE-2.0 |
| ;; |
| ;; Unless required by applicable law or agreed to in writing, software |
| ;; distributed under the License is distributed on an "AS IS" BASIS, |
| ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| ;; See the License for the specific language governing permissions and |
| ;; limitations under the License. |
| (ns storm.starter.clj.word-count |
| (:import [backtype.storm StormSubmitter LocalCluster]) |
| (:use [backtype.storm clojure config]) |
| (:gen-class)) |
| |
| (defspout sentence-spout ["sentence"] |
| [conf context collector] |
| (let [sentences ["a little brown dog" |
| "the man petted the dog" |
| "four score and seven years ago" |
| "an apple a day keeps the doctor away"]] |
| (spout |
| (nextTuple [] |
| (Thread/sleep 100) |
| (emit-spout! collector [(rand-nth sentences)]) |
| ) |
| (ack [id] |
| ;; You only need to define this method for reliable spouts |
| ;; (such as one that reads off of a queue like Kestrel) |
| ;; This is an unreliable spout, so it does nothing here |
| )))) |
| |
| (defspout sentence-spout-parameterized ["word"] {:params [sentences] :prepare false} |
| [collector] |
| (Thread/sleep 500) |
| (emit-spout! collector [(rand-nth sentences)])) |
| |
| (defbolt split-sentence ["word"] [tuple collector] |
| (let [words (.split (.getString tuple 0) " ")] |
| (doseq [w words] |
| (emit-bolt! collector [w] :anchor tuple)) |
| (ack! collector tuple) |
| )) |
| |
| (defbolt word-count ["word" "count"] {:prepare true} |
| [conf context collector] |
| (let [counts (atom {})] |
| (bolt |
| (execute [tuple] |
| (let [word (.getString tuple 0)] |
| (swap! counts (partial merge-with +) {word 1}) |
| (emit-bolt! collector [word (@counts word)] :anchor tuple) |
| (ack! collector tuple) |
| ))))) |
| |
| (defn mk-topology [] |
| |
| (topology |
| {"1" (spout-spec sentence-spout) |
| "2" (spout-spec (sentence-spout-parameterized |
| ["the cat jumped over the door" |
| "greetings from a faraway land"]) |
| :p 2)} |
| {"3" (bolt-spec {"1" :shuffle "2" :shuffle} |
| split-sentence |
| :p 5) |
| "4" (bolt-spec {"3" ["word"]} |
| word-count |
| :p 6)})) |
| |
| (defn run-local! [] |
| (let [cluster (LocalCluster.)] |
| (.submitTopology cluster "word-count" {TOPOLOGY-DEBUG true} (mk-topology)) |
| (Thread/sleep 10000) |
| (.shutdown cluster) |
| )) |
| |
| (defn submit-topology! [name] |
| (StormSubmitter/submitTopology |
| name |
| {TOPOLOGY-DEBUG true |
| TOPOLOGY-WORKERS 3} |
| (mk-topology))) |
| |
| (defn -main |
| ([] |
| (run-local!)) |
| ([name] |
| (submit-topology! name))) |
| |