blob: 26def333ce3b079c42a18ff4353d61a755564e39 [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.zookeeper
(:import [org.apache.curator.retry RetryNTimes]
[backtype.storm Config])
(:import [org.apache.curator.framework.api CuratorEvent CuratorEventType CuratorListener UnhandledErrorListener])
(:import [org.apache.curator.framework.state ConnectionStateListener])
(:import [org.apache.curator.framework CuratorFramework CuratorFrameworkFactory])
(:import [org.apache.curator.framework.recipes.leader LeaderLatch LeaderLatch$State Participant LeaderLatchListener])
(:import [org.apache.zookeeper ZooKeeper Watcher KeeperException$NoNodeException
ZooDefs ZooDefs$Ids CreateMode WatchedEvent Watcher$Event Watcher$Event$KeeperState
Watcher$Event$EventType KeeperException$NodeExistsException])
(:import [org.apache.zookeeper.data Stat])
(:import [org.apache.zookeeper.server ZooKeeperServer NIOServerCnxnFactory])
(:import [java.net InetSocketAddress BindException InetAddress])
(:import [backtype.storm.nimbus ILeaderElector NimbusInfo])
(:import [java.io File])
(:import [java.util List Map])
(:import [backtype.storm.utils Utils ZookeeperAuthInfo])
(:use [backtype.storm util log config]))
(def zk-keeper-states
{Watcher$Event$KeeperState/Disconnected :disconnected
Watcher$Event$KeeperState/SyncConnected :connected
Watcher$Event$KeeperState/AuthFailed :auth-failed
Watcher$Event$KeeperState/Expired :expired})
(def zk-event-types
{Watcher$Event$EventType/None :none
Watcher$Event$EventType/NodeCreated :node-created
Watcher$Event$EventType/NodeDeleted :node-deleted
Watcher$Event$EventType/NodeDataChanged :node-data-changed
Watcher$Event$EventType/NodeChildrenChanged :node-children-changed})
(defn- default-watcher
[state type path]
(log-message "Zookeeper state update: " state type path))
(defnk mk-client
[conf servers port
:root ""
:watcher default-watcher
:auth-conf nil]
(let [fk (Utils/newCurator conf servers port root (when auth-conf (ZookeeperAuthInfo. auth-conf)))]
(.. fk
(getCuratorListenable)
(addListener
(reify CuratorListener
(^void eventReceived [this ^CuratorFramework _fk ^CuratorEvent e]
(when (= (.getType e) CuratorEventType/WATCHED)
(let [^WatchedEvent event (.getWatchedEvent e)]
(watcher (zk-keeper-states (.getState event))
(zk-event-types (.getType event))
(.getPath event))))))))
;; (.. fk
;; (getUnhandledErrorListenable)
;; (addListener
;; (reify UnhandledErrorListener
;; (unhandledError [this msg error]
;; (if (or (exception-cause? InterruptedException error)
;; (exception-cause? java.nio.channels.ClosedByInterruptException error))
;; (do (log-warn-error error "Zookeeper exception " msg)
;; (let [to-throw (InterruptedException.)]
;; (.initCause to-throw error)
;; (throw to-throw)
;; ))
;; (do (log-error error "Unrecoverable Zookeeper error " msg)
;; (halt-process! 1 "Unrecoverable Zookeeper error")))
;; ))))
(.start fk)
fk))
(def zk-create-modes
{:ephemeral CreateMode/EPHEMERAL
:persistent CreateMode/PERSISTENT
:sequential CreateMode/PERSISTENT_SEQUENTIAL})
(defn create-node
([^CuratorFramework zk ^String path ^bytes data mode acls]
(let [mode (zk-create-modes mode)]
(try
(.. zk (create) (creatingParentsIfNeeded) (withMode mode) (withACL acls) (forPath (normalize-path path) data))
(catch Exception e (throw (wrap-in-runtime e))))))
([^CuratorFramework zk ^String path ^bytes data acls]
(create-node zk path data :persistent acls)))
(defn exists-node?
[^CuratorFramework zk ^String path watch?]
((complement nil?)
(try
(if watch?
(.. zk (checkExists) (watched) (forPath (normalize-path path)))
(.. zk (checkExists) (forPath (normalize-path path))))
(catch Exception e (throw (wrap-in-runtime e))))))
(defnk delete-node
[^CuratorFramework zk ^String path]
(let [path (normalize-path path)]
(when (exists-node? zk path false)
(try-cause (.. zk (delete) (deletingChildrenIfNeeded) (forPath (normalize-path path)))
(catch KeeperException$NoNodeException e
;; do nothing
)
(catch Exception e (throw (wrap-in-runtime e)))))))
(defn mkdirs
[^CuratorFramework zk ^String path acls]
(let [path (normalize-path path)]
(when-not (or (= path "/") (exists-node? zk path false))
(mkdirs zk (parent-path path) acls)
(try-cause
(create-node zk path (barr 7) :persistent acls)
(catch KeeperException$NodeExistsException e
;; this can happen when multiple clients doing mkdir at same time
))
)))
(defn sync-path
[^CuratorFramework zk ^String path]
(try
(.. zk (sync) (forPath (normalize-path path)))
(catch Exception e (throw (wrap-in-runtime e)))))
(defn add-listener [^CuratorFramework zk ^ConnectionStateListener listener]
(.. zk (getConnectionStateListenable) (addListener listener)))
(defn get-data
[^CuratorFramework zk ^String path watch?]
(let [path (normalize-path path)]
(try-cause
(if (exists-node? zk path watch?)
(if watch?
(.. zk (getData) (watched) (forPath path))
(.. zk (getData) (forPath path))))
(catch KeeperException$NoNodeException e
;; this is fine b/c we still have a watch from the successful exists call
nil )
(catch Exception e (throw (wrap-in-runtime e))))))
(defn get-data-with-version
[^CuratorFramework zk ^String path watch?]
(let [stats (org.apache.zookeeper.data.Stat. )
path (normalize-path path)]
(try-cause
(if-let [data
(if (exists-node? zk path watch?)
(if watch?
(.. zk (getData) (watched) (storingStatIn stats) (forPath path))
(.. zk (getData) (storingStatIn stats) (forPath path))))]
{:data data
:version (.getVersion stats)})
(catch KeeperException$NoNodeException e
;; this is fine b/c we still have a watch from the successful exists call
nil ))))
(defn get-version
[^CuratorFramework zk ^String path watch?]
(if-let [stats
(if watch?
(.. zk (checkExists) (watched) (forPath (normalize-path path)))
(.. zk (checkExists) (forPath (normalize-path path))))]
(.getVersion stats)
nil))
(defn get-children
[^CuratorFramework zk ^String path watch?]
(try
(if watch?
(.. zk (getChildren) (watched) (forPath (normalize-path path)))
(.. zk (getChildren) (forPath (normalize-path path))))
(catch Exception e (throw (wrap-in-runtime e)))))
(defn set-data
[^CuratorFramework zk ^String path ^bytes data]
(try
(.. zk (setData) (forPath (normalize-path path) data))
(catch Exception e (throw (wrap-in-runtime e)))))
(defn exists
[^CuratorFramework zk ^String path watch?]
(exists-node? zk path watch?))
(defnk mk-inprocess-zookeeper
[localdir :port nil]
(let [localfile (File. localdir)
zk (ZooKeeperServer. localfile localfile 2000)
[retport factory]
(loop [retport (if port port 2000)]
(if-let [factory-tmp
(try-cause
(doto (NIOServerCnxnFactory.)
(.configure (InetSocketAddress. retport) 0))
(catch BindException e
(when (> (inc retport) (if port port 65535))
(throw (RuntimeException.
"No port is available to launch an inprocess zookeeper.")))))]
[retport factory-tmp]
(recur (inc retport))))]
(log-message "Starting inprocess zookeeper at port " retport " and dir " localdir)
(.startup factory zk)
[retport factory]))
(defn shutdown-inprocess-zookeeper
[handle]
(.shutdown handle))
(defn- to-NimbusInfo [^Participant participant]
(let
[id (if (clojure.string/blank? (.getId participant))
(throw (RuntimeException. "No nimbus leader participant host found, have you started your nimbus hosts?"))
(.getId participant))
nimbus-info (NimbusInfo/parse id)]
(.setLeader nimbus-info (.isLeader participant))
nimbus-info))
(defn leader-latch-listener-impl
"Leader latch listener that will be invoked when we either gain or lose leadership"
[conf zk leader-latch]
(let [hostname (.getCanonicalHostName (InetAddress/getLocalHost))
STORMS-ROOT (str (conf STORM-ZOOKEEPER-ROOT) "/storms")]
(reify LeaderLatchListener
(^void isLeader[this]
(log-message (str hostname " gained leadership, checking if it has all the topology code locally."))
(let [active-topology-ids (set (get-children zk STORMS-ROOT false))
local-topology-ids (set (.list (File. (master-stormdist-root conf))))
diff-topology (first (set-delta active-topology-ids local-topology-ids))]
(log-message "active-topology-ids [" (clojure.string/join "," active-topology-ids)
"] local-topology-ids [" (clojure.string/join "," local-topology-ids)
"] diff-topology [" (clojure.string/join "," diff-topology) "]")
(if (empty? diff-topology)
(log-message "Accepting leadership, all active topology found localy.")
(do
(log-message "code for all active topologies not available locally, giving up leadership.")
(.close leader-latch)))))
(^void notLeader[this]
(log-message (str hostname " lost leadership."))))))
(defn zk-leader-elector
"Zookeeper Implementation of ILeaderElector."
[conf]
(let [servers (conf STORM-ZOOKEEPER-SERVERS)
zk (mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf)
leader-lock-path (str (conf STORM-ZOOKEEPER-ROOT) "/leader-lock")
id (.toHostPortString (NimbusInfo/fromConf conf))
leader-latch (atom (LeaderLatch. zk leader-lock-path id))
leader-latch-listener (atom (leader-latch-listener-impl conf zk @leader-latch))
]
(reify ILeaderElector
(prepare [this conf]
(log-message "no-op for zookeeper implementation"))
(^void addToLeaderLockQueue [this]
;if this latch is already closed, we need to create new instance.
(if (.equals LeaderLatch$State/CLOSED (.getState @leader-latch))
(do
(reset! leader-latch (LeaderLatch. zk leader-lock-path id))
(reset! leader-latch-listener (leader-latch-listener-impl conf zk @leader-latch))
(log-message "LeaderLatch was in closed state. Resetted the leaderLatch and listeners.")
))
;Only if the latch is not already started we invoke start.
(if (.equals LeaderLatch$State/LATENT (.getState @leader-latch))
(do
(.addListener @leader-latch @leader-latch-listener)
(.start @leader-latch)
(log-message "Queued up for leader lock."))
(log-message "Node already in queue for leader lock.")))
(^void removeFromLeaderLockQueue [this]
;Only started latches can be closed.
(if (.equals LeaderLatch$State/STARTED (.getState @leader-latch))
(do
(.close @leader-latch)
(log-message "Removed from leader lock queue."))
(log-message "leader latch is not started so no removeFromLeaderLockQueue needed.")))
(^boolean isLeader [this]
(.hasLeadership @leader-latch))
(^NimbusInfo getLeader [this]
(to-NimbusInfo (.getLeader @leader-latch)))
(^List getAllNimbuses [this]
(let [participants (.getParticipants @leader-latch)]
(map (fn [^Participant participant]
(to-NimbusInfo participant))
participants)))
(^void close[this]
(log-message "closing zookeeper connection of leader elector.")
(.close zk)))))