(ns backtype.storm.scheduler.IsolationScheduler
(:use [backtype.storm util config log])
(:require [backtype.storm.scheduler.DefaultScheduler :as DefaultScheduler])
(:import [java.util HashSet Set List LinkedList ArrayList Map HashMap])
(:import [backtype.storm.scheduler IScheduler Topologies
Cluster TopologyDetails WorkerSlot SchedulerAssignment
EvenScheduler ExecutorDetails])
:init init
:constructors {[] []}
:state state
:implements [backtype.storm.scheduler.IScheduler]))
(defn -init []
[[] (container)])
(defn -prepare [this conf]
(container-set! (.state this) conf))
(defn- compute-worker-specs "Returns list of sets of executors"
[^TopologyDetails details]
(->> (.getExecutorToComponent details)
(map second)
(apply interleave-all)
(partition-fixed (.getNumWorkers details))
(map set)))
(defn- compute-worker-specs "Returns mutable set of sets of executors"
[^TopologyDetails details]
(->> (.getExecutorToComponent details)
(map second)
(apply concat)
(map vector (repeat-seq (range (.getNumWorkers details))))
(group-by first)
(map-val #(map second %))
(map set)
(defn isolated-topologies [conf topologies]
(let [tset (-> conf (get ISOLATION-SCHEDULER-MACHINES) keys set)]
(filter (fn [^TopologyDetails t] (contains? tset (.getName t))) topologies)
;; map from topology id -> set of sets of executors
(defn topology-worker-specs [iso-topologies]
(->> iso-topologies
(map (fn [t] {(.getId t) (compute-worker-specs t)}))
(apply merge)))
(defn machine-distribution [conf ^TopologyDetails topology]
(let [name->machines (get conf ISOLATION-SCHEDULER-MACHINES)
machines (get name->machines (.getName topology))
workers (.getNumWorkers topology)]
(-> (integer-divided workers machines)
(dissoc 0)
(defn topology-machine-distribution [conf iso-topologies]
(->> iso-topologies
(map (fn [t] {(.getId t) (machine-distribution conf t)}))
(apply merge)))
(defn host-assignments [^Cluster cluster]
(letfn [(to-slot-specs [^SchedulerAssignment ass]
(->> ass
(map (fn [[slot executors]]
[slot (.getTopologyId ass) (set executors)]))))]
(->> cluster
(mapcat to-slot-specs)
(group-by (fn [[^WorkerSlot slot & _]] (.getHost cluster (.getNodeId slot))))
(defn- decrement-distribution! [^Map distribution value]
(let [v (-> distribution (get value) dec)]
(if (zero? v)
(.remove distribution value)
(.put distribution value v))))
;; returns list of list of slots, reverse sorted by number of slots
(defn- host-assignable-slots [^Cluster cluster]
(-<> cluster
(group-by #(.getHost cluster (.getNodeId ^WorkerSlot %)) <>)
(dissoc <> nil)
(sort-by #(-> % second count -) <>)
(LinkedList. <>)
(defn- host->used-slots [^Cluster cluster]
(->> cluster
(group-by #(.getHost cluster (.getNodeId ^WorkerSlot %)))
(defn- distribution->sorted-amts [distribution]
(->> distribution
(mapcat (fn [[val amt]] (repeat amt val)))
(sort-by -)
(defn- allocated-topologies [topology-worker-specs]
(->> topology-worker-specs
(filter (fn [[_ worker-specs]] (empty? worker-specs)))
(map first)
(defn- leftover-topologies [^Topologies topologies filter-ids-set]
(->> topologies
(filter (fn [^TopologyDetails t] (not (contains? filter-ids-set (.getId t)))))
(map (fn [^TopologyDetails t] {(.getId t) t}))
(apply merge)
;; for each isolated topology:
;; compute even distribution of executors -> workers on the number of workers specified for the topology
;; compute distribution of workers to machines
;; determine host -> list of [slot, topology id, executors]
;; iterate through hosts and: a machine is good if:
;; 1. only running workers from one isolated topology
;; 2. all workers running on it match one of the distributions of executors for that topology
;; 3. matches one of the # of workers
;; blacklist the good hosts and remove those workers from the list of need to be assigned workers
;; otherwise unassign all other workers for isolated topologies if assigned
(defn remove-elem-from-set! [^Set aset]
(let [elem (-> aset .iterator .next)]
(.remove aset elem)
;; get host -> all assignable worker slots for non-blacklisted machines (assigned or not assigned)
;; will then have a list of machines that need to be assigned (machine -> [topology, list of list of executors])
;; match each spec to a machine (who has the right number of workers), free everything else on that machine and assign those slots (do one topology at a time)
;; blacklist all machines who had production slots defined
;; log isolated topologies who weren't able to get enough slots / machines
;; run default scheduler on isolated topologies that didn't have enough slots + non-isolated topologies on remaining machines
;; set blacklist to what it was initially
(defn -schedule [this ^Topologies topologies ^Cluster cluster]
(let [conf (container-get (.state this))
orig-blacklist (HashSet. (.getBlacklistedHosts cluster))
iso-topologies (isolated-topologies conf (.getTopologies topologies))
iso-ids-set (->> iso-topologies (map #(.getId ^TopologyDetails %)) set)
topology-worker-specs (topology-worker-specs iso-topologies)
topology-machine-distribution (topology-machine-distribution conf iso-topologies)
host-assignments (host-assignments cluster)]
(doseq [[host assignments] host-assignments]
(let [top-id (-> assignments first second)
distribution (get topology-machine-distribution top-id)
^Set worker-specs (get topology-worker-specs top-id)
num-workers (count assignments)
(if (and (contains? iso-ids-set top-id)
(every? #(= (second %) top-id) assignments)
(contains? distribution num-workers)
(every? #(contains? worker-specs (nth % 2)) assignments))
(do (decrement-distribution! distribution num-workers)
(doseq [[_ _ executors] assignments] (.remove worker-specs executors))
(.blacklistHost cluster host))
(doseq [[slot top-id _] assignments]
(when (contains? iso-ids-set top-id)
(.freeSlot cluster slot)
(let [host->used-slots (host->used-slots cluster)
^LinkedList sorted-assignable-hosts (host-assignable-slots cluster)]
;; TODO: can improve things further by ordering topologies in terms of who needs the least workers
(doseq [[top-id worker-specs] topology-worker-specs
:let [amts (distribution->sorted-amts (get topology-machine-distribution top-id))]]
(doseq [amt amts
:let [[host host-slots] (.peek sorted-assignable-hosts)]]
(when (and host-slots (>= (count host-slots) amt))
(.poll sorted-assignable-hosts)
(.freeSlots cluster (get host->used-slots host))
(doseq [slot (take amt host-slots)
:let [executors-set (remove-elem-from-set! worker-specs)]]
(.assign cluster slot top-id executors-set))
(.blacklistHost cluster host))
(let [failed-iso-topologies (->> topology-worker-specs
(mapcat (fn [[top-id worker-specs]]
(if-not (empty? worker-specs) [top-id])
(if (empty? failed-iso-topologies)
;; run default scheduler on non-isolated topologies
(-<> topology-worker-specs
(leftover-topologies topologies <>)
(DefaultScheduler/default-schedule <> cluster))
(log-warn "Unstable to isolate topologies " (pr-str failed-iso-topologies) ". Will wait for enough resources for isolated topologies before allocating any other resources.")
(.setBlacklistedHosts cluster orig-blacklist)