| ;; Licensed to the Apache Software Foundation (ASF) under one |
| ;; or more contributor license agreements. See the NOTICE file |
| ;; distributed with this work for additional information |
| ;; regarding copyright ownership. The ASF licenses this file |
| ;; to you under the Apache License, Version 2.0 (the |
| ;; "License"); you may not use this file except in compliance |
| ;; with the License. You may obtain a copy of the License at |
| ;; |
| ;; http://www.apache.org/licenses/LICENSE-2.0 |
| ;; |
| ;; Unless required by applicable law or agreed to in writing, software |
| ;; distributed under the License is distributed on an "AS IS" BASIS, |
| ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| ;; See the License for the specific language governing permissions and |
| ;; limitations under the License. |
| (ns backtype.storm.scheduler.resource-aware-scheduler-test |
| (:use [clojure test]) |
| (:use [backtype.storm config testing thrift]) |
| (:require [backtype.storm.util :refer [map-val reverse-map sum]]) |
| (:require [backtype.storm.daemon [nimbus :as nimbus]]) |
| (:import [backtype.storm.generated StormTopology] |
| [backtype.storm Config] |
| [backtype.storm.testing TestWordSpout TestWordCounter] |
| [backtype.storm.topology TopologyBuilder]) |
| (:import [backtype.storm.scheduler Cluster SupervisorDetails WorkerSlot ExecutorDetails |
| SchedulerAssignmentImpl Topologies TopologyDetails]) |
| (:import [backtype.storm.scheduler.resource RAS_Node ResourceAwareScheduler]) |
| (:import [backtype.storm Config StormSubmitter]) |
| (:import [backtype.storm LocalDRPC LocalCluster]) |
| (:import [java.util HashMap])) |
| |
| (defn gen-supervisors [count ports] |
| (into {} (for [id (range count) |
| :let [supervisor (SupervisorDetails. (str "id" id) |
| (str "host" id) |
| (list ) (map int (range ports)) |
| {Config/SUPERVISOR_MEMORY_CAPACITY_MB 2000.0 |
| Config/SUPERVISOR_CPU_CAPACITY 400.0})]] |
| {(.getId supervisor) supervisor}))) |
| |
| (defn to-top-map [topologies] |
| (into {} (for [top topologies] {(.getId top) top}))) |
| |
| (defn ed [id] (ExecutorDetails. (int id) (int id))) |
| |
| (defn mk-ed-map [arg] |
| (into {} |
| (for [[name start end] arg] |
| (into {} |
| (for [at (range start end)] |
| {(ed at) name}))))) |
| |
| ;; get the super->mem HashMap by counting the eds' mem usage of all topos on each super |
| (defn get-super->mem-usage [^Cluster cluster ^Topologies topologies] |
| (let [assignments (.values (.getAssignments cluster)) |
| supers (.values (.getSupervisors cluster)) |
| super->mem-usage (HashMap.) |
| _ (doseq [super supers] |
| (.put super->mem-usage super 0))] ;; initialize the mem-usage as 0 for all supers |
| (doseq [assignment assignments] |
| (let [ed->super (into {} |
| (for [[ed slot] (.getExecutorToSlot assignment)] |
| {ed (.getSupervisorById cluster (.getNodeId slot))})) |
| super->eds (reverse-map ed->super) |
| topology (.getById topologies (.getTopologyId assignment)) |
| super->mem-pertopo (map-val (fn [eds] |
| (reduce + (map #(.getTotalMemReqTask topology %) eds))) |
| super->eds)] ;; sum up the one topo's eds' mem usage on a super |
| (doseq [[super mem] super->mem-pertopo] |
| (.put super->mem-usage |
| super (+ mem (.get super->mem-usage super)))))) ;; add all topo's mem usage for each super |
| super->mem-usage)) |
| |
| ;; get the super->cpu HashMap by counting the eds' cpu usage of all topos on each super |
| (defn get-super->cpu-usage [^Cluster cluster ^Topologies topologies] |
| (let [assignments (.values (.getAssignments cluster)) |
| supers (.values (.getSupervisors cluster)) |
| super->cpu-usage (HashMap.) |
| _ (doseq [super supers] |
| (.put super->cpu-usage super 0))] ;; initialize the cpu-usage as 0 for all supers |
| (doseq [assignment assignments] |
| (let [ed->super (into {} |
| (for [[ed slot] (.getExecutorToSlot assignment)] |
| {ed (.getSupervisorById cluster (.getNodeId slot))})) |
| super->eds (reverse-map ed->super) |
| topology (.getById topologies (.getTopologyId assignment)) |
| super->cpu-pertopo (map-val (fn [eds] |
| (reduce + (map #(.getTotalCpuReqTask topology %) eds))) |
| super->eds)] ;; sum up the one topo's eds' cpu usage on a super |
| (doseq [[super cpu] super->cpu-pertopo] |
| (.put super->cpu-usage |
| super (+ cpu (.get super->cpu-usage super)))))) ;; add all topo's cpu usage for each super |
| super->cpu-usage)) |
| |
| ;; testing resource/Node class |
| (deftest test-node |
| (let [supers (gen-supervisors 5 4) |
| cluster (Cluster. (nimbus/standalone-nimbus) supers {} {}) |
| topologies (Topologies. (to-top-map [])) |
| node-map (RAS_Node/getAllNodesFrom cluster topologies) |
| topology1 (TopologyDetails. "topology1" {} nil 0) |
| topology2 (TopologyDetails. "topology2" {} nil 0)] |
| (is (= 5 (.size node-map))) |
| (let [node (.get node-map "id0")] |
| (is (= "id0" (.getId node))) |
| (is (= true (.isAlive node))) |
| (is (= 0 (.size (.getRunningTopologies node)))) |
| (is (= true (.isTotallyFree node))) |
| (is (= 4 (.totalSlotsFree node))) |
| (is (= 0 (.totalSlotsUsed node))) |
| (is (= 4 (.totalSlots node))) |
| (.assign node topology1 (list (ExecutorDetails. 1 1)) cluster) |
| (is (= 1 (.size (.getRunningTopologies node)))) |
| (is (= false (.isTotallyFree node))) |
| (is (= 3 (.totalSlotsFree node))) |
| (is (= 1 (.totalSlotsUsed node))) |
| (is (= 4 (.totalSlots node))) |
| (.assign node topology1 (list (ExecutorDetails. 2 2)) cluster) |
| (is (= 1 (.size (.getRunningTopologies node)))) |
| (is (= false (.isTotallyFree node))) |
| (is (= 2 (.totalSlotsFree node))) |
| (is (= 2 (.totalSlotsUsed node))) |
| (is (= 4 (.totalSlots node))) |
| (.assign node topology2 (list (ExecutorDetails. 1 1)) cluster) |
| (is (= 2 (.size (.getRunningTopologies node)))) |
| (is (= false (.isTotallyFree node))) |
| (is (= 1 (.totalSlotsFree node))) |
| (is (= 3 (.totalSlotsUsed node))) |
| (is (= 4 (.totalSlots node))) |
| (.assign node topology2 (list (ExecutorDetails. 2 2)) cluster) |
| (is (= 2 (.size (.getRunningTopologies node)))) |
| (is (= false (.isTotallyFree node))) |
| (is (= 0 (.totalSlotsFree node))) |
| (is (= 4 (.totalSlotsUsed node))) |
| (is (= 4 (.totalSlots node))) |
| (.freeAllSlots node cluster) |
| (is (= 0 (.size (.getRunningTopologies node)))) |
| (is (= true (.isTotallyFree node))) |
| (is (= 4 (.totalSlotsFree node))) |
| (is (= 0 (.totalSlotsUsed node))) |
| (is (= 4 (.totalSlots node))) |
| ))) |
| |
| (deftest test-sanity-resource-aware-scheduler |
| (let [builder (TopologyBuilder.) |
| _ (.setSpout builder "wordSpout" (TestWordSpout.) 1) |
| _ (.shuffleGrouping (.setBolt builder "wordCountBolt" (TestWordCounter.) 1) "wordSpout") |
| supers (gen-supervisors 1 2) |
| storm-topology (.createTopology builder) |
| topology1 (TopologyDetails. "topology1" |
| {TOPOLOGY-NAME "topology-name-1" |
| TOPOLOGY-SUBMITTER-USER "userC" |
| TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 |
| TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 |
| TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 |
| TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0} |
| storm-topology |
| 1 |
| (mk-ed-map [["wordSpout" 0 1] |
| ["wordCountBolt" 1 2]])) |
| cluster (Cluster. (nimbus/standalone-nimbus) supers {} |
| {STORM-NETWORK-TOPOGRAPHY-PLUGIN |
| "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"}) |
| topologies (Topologies. (to-top-map [topology1])) |
| node-map (RAS_Node/getAllNodesFrom cluster topologies) |
| scheduler (ResourceAwareScheduler.)] |
| (.schedule scheduler topologies cluster) |
| (let [assignment (.getAssignmentById cluster "topology1") |
| assigned-slots (.getSlots assignment) |
| executors (.getExecutors assignment)] |
| (is (= 1 (.size assigned-slots))) |
| (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot)))))) |
| (is (= 2 (.size executors)))) |
| (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1"))))) |
| |
| (deftest test-topology-with-multiple-spouts |
| (let [builder1 (TopologyBuilder.) ;; a topology with multiple spouts |
| _ (.setSpout builder1 "wordSpout1" (TestWordSpout.) 1) |
| _ (.setSpout builder1 "wordSpout2" (TestWordSpout.) 1) |
| _ (doto |
| (.setBolt builder1 "wordCountBolt1" (TestWordCounter.) 1) |
| (.shuffleGrouping "wordSpout1") |
| (.shuffleGrouping "wordSpout2")) |
| _ (.shuffleGrouping (.setBolt builder1 "wordCountBolt2" (TestWordCounter.) 1) "wordCountBolt1") |
| _ (.shuffleGrouping (.setBolt builder1 "wordCountBolt3" (TestWordCounter.) 1) "wordCountBolt1") |
| _ (.shuffleGrouping (.setBolt builder1 "wordCountBolt4" (TestWordCounter.) 1) "wordCountBolt2") |
| _ (.shuffleGrouping (.setBolt builder1 "wordCountBolt5" (TestWordCounter.) 1) "wordSpout2") |
| storm-topology1 (.createTopology builder1) |
| topology1 (TopologyDetails. "topology1" |
| {TOPOLOGY-NAME "topology-name-1" |
| TOPOLOGY-SUBMITTER-USER "userC" |
| TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 |
| TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 |
| TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 |
| TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0} |
| storm-topology1 |
| 1 |
| (mk-ed-map [["wordSpout1" 0 1] |
| ["wordSpout2" 1 2] |
| ["wordCountBolt1" 2 3] |
| ["wordCountBolt2" 3 4] |
| ["wordCountBolt3" 4 5] |
| ["wordCountBolt4" 5 6] |
| ["wordCountBolt5" 6 7]])) |
| builder2 (TopologyBuilder.) ;; a topology with two unconnected partitions |
| _ (.setSpout builder2 "wordSpoutX" (TestWordSpout.) 1) |
| _ (.setSpout builder2 "wordSpoutY" (TestWordSpout.) 1) |
| storm-topology2 (.createTopology builder1) |
| topology2 (TopologyDetails. "topology2" |
| {TOPOLOGY-NAME "topology-name-2" |
| TOPOLOGY-SUBMITTER-USER "userC" |
| TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 |
| TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 |
| TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 |
| TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0} |
| storm-topology2 |
| 1 |
| (mk-ed-map [["wordSpoutX" 0 1] |
| ["wordSpoutY" 1 2]])) |
| supers (gen-supervisors 2 4) |
| cluster (Cluster. (nimbus/standalone-nimbus) supers {} |
| {STORM-NETWORK-TOPOGRAPHY-PLUGIN |
| "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"}) |
| topologies (Topologies. (to-top-map [topology1 topology2])) |
| scheduler (ResourceAwareScheduler.)] |
| (.schedule scheduler topologies cluster) |
| (let [assignment (.getAssignmentById cluster "topology1") |
| assigned-slots (.getSlots assignment) |
| executors (.getExecutors assignment)] |
| (is (= 1 (.size assigned-slots))) |
| (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot)))))) |
| (is (= 7 (.size executors)))) |
| (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1"))) |
| (let [assignment (.getAssignmentById cluster "topology2") |
| assigned-slots (.getSlots assignment) |
| executors (.getExecutors assignment)] |
| (is (= 1 (.size assigned-slots))) |
| (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot)))))) |
| (is (= 2 (.size executors)))) |
| (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2"))))) |
| |
| (deftest test-topology-set-memory-and-cpu-load |
| (let [builder (TopologyBuilder.) |
| _ (.setSpout builder "wordSpout" (TestWordSpout.) 1) |
| _ (doto |
| (.setBolt builder "wordCountBolt" (TestWordCounter.) 1) |
| (.setMemoryLoad 110.0) |
| (.setCPULoad 20.0) |
| (.shuffleGrouping "wordSpout")) |
| supers (gen-supervisors 2 2) ;; to test whether two tasks will be assigned to one or two nodes |
| storm-topology (.createTopology builder) |
| topology2 (TopologyDetails. "topology2" |
| {TOPOLOGY-NAME "topology-name-2" |
| TOPOLOGY-SUBMITTER-USER "userC" |
| TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 |
| TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 |
| TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 |
| TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0} |
| storm-topology |
| 2 |
| (mk-ed-map [["wordSpout" 0 1] |
| ["wordCountBolt" 1 2]])) |
| cluster (Cluster. (nimbus/standalone-nimbus) supers {} |
| {STORM-NETWORK-TOPOGRAPHY-PLUGIN |
| "backtype.storm.testing.AlternateRackDNSToSwitchMapping"}) |
| topologies (Topologies. (to-top-map [topology2])) |
| scheduler (ResourceAwareScheduler.)] |
| (.schedule scheduler topologies cluster) |
| (let [assignment (.getAssignmentById cluster "topology2") |
| assigned-slots (.getSlots assignment) |
| executors (.getExecutors assignment)] |
| ;; 4 slots on 1 machine, all executors assigned |
| (is (= 1 (.size assigned-slots))) |
| (is (= 1 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot)))))) |
| (is (= 2 (.size executors)))) |
| (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2"))))) |
| |
| (deftest test-resource-limitation |
| (let [builder (TopologyBuilder.) |
| _ (doto (.setSpout builder "wordSpout" (TestWordSpout.) 2) |
| (.setMemoryLoad 1000.0 200.0) |
| (.setCPULoad 250.0)) |
| _ (doto (.setBolt builder "wordCountBolt" (TestWordCounter.) 1) |
| (.shuffleGrouping "wordSpout") |
| (.setMemoryLoad 500.0 100.0) |
| (.setCPULoad 100.0)) |
| supers (gen-supervisors 2 2) ;; need at least two nodes to hold these executors |
| storm-topology (.createTopology builder) |
| topology1 (TopologyDetails. "topology1" |
| {TOPOLOGY-NAME "topology-name-1" |
| TOPOLOGY-SUBMITTER-USER "userC" |
| TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 |
| TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 |
| TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 |
| TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0} |
| storm-topology |
| 2 ;; need two workers, each on one node |
| (mk-ed-map [["wordSpout" 0 2] |
| ["wordCountBolt" 2 3]])) |
| cluster (Cluster. (nimbus/standalone-nimbus) supers {} |
| {STORM-NETWORK-TOPOGRAPHY-PLUGIN |
| "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"}) |
| topologies (Topologies. (to-top-map [topology1])) |
| scheduler (ResourceAwareScheduler.)] |
| (.schedule scheduler topologies cluster) |
| (let [assignment (.getAssignmentById cluster "topology1") |
| assigned-slots (.getSlots assignment) |
| node-ids (map #(.getNodeId %) assigned-slots) |
| executors (.getExecutors assignment) |
| epsilon 0.000001 |
| assigned-ed-mem (sort (map #(.getTotalMemReqTask topology1 %) executors)) |
| assigned-ed-cpu (sort (map #(.getTotalCpuReqTask topology1 %) executors)) |
| ed->super (into {} |
| (for [[ed slot] (.getExecutorToSlot assignment)] |
| {ed (.getSupervisorById cluster (.getNodeId slot))})) |
| super->eds (reverse-map ed->super) |
| mem-avail->used (into [] |
| (for [[super eds] super->eds] |
| [(.getTotalMemory super) (sum (map #(.getTotalMemReqTask topology1 %) eds))])) |
| cpu-avail->used (into [] |
| (for [[super eds] super->eds] |
| [(.getTotalCPU super) (sum (map #(.getTotalCpuReqTask topology1 %) eds))]))] |
| ;; 4 slots on 1 machine, all executors assigned |
| (is (= 2 (.size assigned-slots))) ;; executor0 resides one one worker (on one), executor1 and executor2 on another worker (on the other node) |
| (is (= 2 (.size (into #{} (for [slot assigned-slots] (.getNodeId slot)))))) |
| (is (= 3 (.size executors))) |
| ;; make sure resource (mem/cpu) assigned equals to resource specified |
| (is (< (Math/abs (- 600.0 (first assigned-ed-mem))) epsilon)) |
| (is (< (Math/abs (- 1200.0 (second assigned-ed-mem))) epsilon)) |
| (is (< (Math/abs (- 1200.0 (last assigned-ed-mem))) epsilon)) |
| (is (< (Math/abs (- 100.0 (first assigned-ed-cpu))) epsilon)) |
| (is (< (Math/abs (- 250.0 (second assigned-ed-cpu))) epsilon)) |
| (is (< (Math/abs (- 250.0 (last assigned-ed-cpu))) epsilon)) |
| (doseq [[avail used] mem-avail->used] ;; for each node, assigned mem smaller than total |
| (is (>= avail used))) |
| (doseq [[avail used] cpu-avail->used] ;; for each node, assigned cpu smaller than total |
| (is (>= avail used)))) |
| (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1"))))) |
| |
| (deftest test-scheduling-resilience |
| (let [supers (gen-supervisors 2 2) |
| builder1 (TopologyBuilder.) |
| _ (.setSpout builder1 "spout1" (TestWordSpout.) 2) |
| storm-topology1 (.createTopology builder1) |
| topology1 (TopologyDetails. "topology1" |
| {TOPOLOGY-NAME "topology-name-1" |
| TOPOLOGY-SUBMITTER-USER "userC" |
| TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 |
| TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 |
| TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 |
| TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0} |
| storm-topology1 |
| 3 ;; three workers to hold three executors |
| (mk-ed-map [["spout1" 0 3]])) |
| builder2 (TopologyBuilder.) |
| _ (.setSpout builder2 "spout2" (TestWordSpout.) 2) |
| storm-topology2 (.createTopology builder2) |
| topology2 (TopologyDetails. "topology2" |
| {TOPOLOGY-NAME "topology-name-2" |
| TOPOLOGY-SUBMITTER-USER "userC" |
| TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 1280.0 ;; large enough thus two eds can not be fully assigned to one node |
| TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 |
| TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 |
| TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0} |
| storm-topology2 |
| 2 ;; two workers, each holds one executor and resides on one node |
| (mk-ed-map [["spout2" 0 2]])) |
| scheduler (ResourceAwareScheduler.)] |
| |
| (testing "When a worker fails, RAS does not alter existing assignments on healthy workers" |
| (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {} |
| {STORM-NETWORK-TOPOGRAPHY-PLUGIN |
| "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"}) |
| topologies (Topologies. (to-top-map [topology2])) |
| _ (.schedule scheduler topologies cluster) |
| assignment (.getAssignmentById cluster "topology2") |
| failed-worker (first (vec (.getSlots assignment))) ;; choose a worker to mock as failed |
| ed->slot (.getExecutorToSlot assignment) |
| failed-eds (.get (reverse-map ed->slot) failed-worker) |
| _ (doseq [ed failed-eds] (.remove ed->slot ed)) ;; remove executor details assigned to the worker |
| copy-old-mapping (HashMap. ed->slot) |
| healthy-eds (.keySet copy-old-mapping) |
| _ (.schedule scheduler topologies cluster) |
| new-assignment (.getAssignmentById cluster "topology2") |
| new-ed->slot (.getExecutorToSlot new-assignment)] |
| ;; for each executor that was scheduled on healthy workers, their slots should remain unchanged after a new scheduling |
| (doseq [ed healthy-eds] |
| (is (.equals (.get copy-old-mapping ed) (.get new-ed->slot ed)))) |
| (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2"))))) |
| |
| (testing "When a supervisor fails, RAS does not alter existing assignments" |
| (let [existing-assignments {"topology1" (SchedulerAssignmentImpl. "topology1" |
| {(ExecutorDetails. 0 0) (WorkerSlot. "id0" 0) ;; worker 0 on the failed super |
| (ExecutorDetails. 1 1) (WorkerSlot. "id0" 1) ;; worker 1 on the failed super |
| (ExecutorDetails. 2 2) (WorkerSlot. "id1" 1)})} ;; worker 2 on the health super |
| cluster (Cluster. (nimbus/standalone-nimbus) supers existing-assignments |
| {STORM-NETWORK-TOPOGRAPHY-PLUGIN |
| "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"}) |
| topologies (Topologies. (to-top-map [topology1])) |
| assignment (.getAssignmentById cluster "topology1") |
| ed->slot (.getExecutorToSlot assignment) |
| copy-old-mapping (HashMap. ed->slot) |
| existing-eds (.keySet copy-old-mapping) ;; all the three eds on three workers |
| new-cluster (Cluster. (nimbus/standalone-nimbus) |
| (dissoc supers "id0") ;; mock the super0 as a failed supervisor |
| (.getAssignments cluster) |
| {STORM-NETWORK-TOPOGRAPHY-PLUGIN |
| "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"}) |
| _ (.schedule scheduler topologies new-cluster) ;; the actual schedule for this topo will not run since it is fully assigned |
| new-assignment (.getAssignmentById new-cluster "topology1") |
| new-ed->slot (.getExecutorToSlot new-assignment)] |
| (doseq [ed existing-eds] |
| (is (.equals (.get copy-old-mapping ed) (.get new-ed->slot ed)))) |
| (is (= "Fully Scheduled" (.get (.getStatusMap new-cluster) "topology1"))))) |
| |
| (testing "When a supervisor and a worker on it fails, RAS does not alter existing assignments" |
| (let [existing-assignments {"topology1" (SchedulerAssignmentImpl. "topology1" |
| {(ExecutorDetails. 0 0) (WorkerSlot. "id0" 1) ;; the worker to orphan |
| (ExecutorDetails. 1 1) (WorkerSlot. "id0" 2) ;; the worker to kill |
| (ExecutorDetails. 2 2) (WorkerSlot. "id1" 1)})} ;; the healthy worker |
| cluster (Cluster. (nimbus/standalone-nimbus) supers existing-assignments |
| {STORM-NETWORK-TOPOGRAPHY-PLUGIN |
| "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"}) |
| topologies (Topologies. (to-top-map [topology1])) |
| assignment (.getAssignmentById cluster "topology1") |
| ed->slot (.getExecutorToSlot assignment) |
| _ (.remove ed->slot (ExecutorDetails. 1 1)) ;; delete one worker of super0 (failed) from topo1 assignment to enable actual schedule for testing |
| copy-old-mapping (HashMap. ed->slot) |
| existing-eds (.keySet copy-old-mapping) ;; namely the two eds on the orphaned worker and the healthy worker |
| new-cluster (Cluster. (nimbus/standalone-nimbus) |
| (dissoc supers "id0") ;; mock the super0 as a failed supervisor |
| (.getAssignments cluster) |
| {STORM-NETWORK-TOPOGRAPHY-PLUGIN |
| "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"}) |
| _ (.schedule scheduler topologies new-cluster) |
| new-assignment (.getAssignmentById new-cluster "topology1") |
| new-ed->slot (.getExecutorToSlot new-assignment)] |
| (doseq [ed existing-eds] |
| (is (.equals (.get copy-old-mapping ed) (.get new-ed->slot ed)))) |
| (is (= "Fully Scheduled" (.get (.getStatusMap new-cluster) "topology1"))))) |
| |
| (testing "Scheduling a new topology does not disturb other assignments unnecessarily" |
| (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {} |
| {STORM-NETWORK-TOPOGRAPHY-PLUGIN |
| "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"}) |
| topologies (Topologies. (to-top-map [topology1])) |
| _ (.schedule scheduler topologies cluster) |
| assignment (.getAssignmentById cluster "topology1") |
| ed->slot (.getExecutorToSlot assignment) |
| copy-old-mapping (HashMap. ed->slot) |
| new-topologies (Topologies. (to-top-map [topology1 topology2])) ;; a second topology joins |
| _ (.schedule scheduler new-topologies cluster) |
| new-assignment (.getAssignmentById cluster "topology1") |
| new-ed->slot (.getExecutorToSlot new-assignment)] |
| (doseq [ed (.keySet copy-old-mapping)] |
| (is (.equals (.get copy-old-mapping ed) (.get new-ed->slot ed)))) ;; the assignment for topo1 should not change |
| (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1"))) |
| (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2"))))))) |
| |
| ;; Automated tests for heterogeneous cluster |
| (deftest test-heterogeneous-cluster |
| (let [supers (into {} (for [super [(SupervisorDetails. (str "id" 0) (str "host" 0) (list ) |
| (map int (list 1 2 3 4)) |
| {Config/SUPERVISOR_MEMORY_CAPACITY_MB 4096.0 |
| Config/SUPERVISOR_CPU_CAPACITY 800.0}) |
| (SupervisorDetails. (str "id" 1) (str "host" 1) (list ) |
| (map int (list 1 2 3 4)) |
| {Config/SUPERVISOR_MEMORY_CAPACITY_MB 1024.0 |
| Config/SUPERVISOR_CPU_CAPACITY 200.0})]] |
| {(.getId super) super})) |
| builder1 (TopologyBuilder.) ;; topo1 has one single huge task that can not be handled by the small-super |
| _ (doto (.setSpout builder1 "spout1" (TestWordSpout.) 1) |
| (.setMemoryLoad 2000.0 48.0) |
| (.setCPULoad 300.0)) |
| storm-topology1 (.createTopology builder1) |
| topology1 (TopologyDetails. "topology1" |
| {TOPOLOGY-NAME "topology-name-1" |
| TOPOLOGY-SUBMITTER-USER "userC" |
| TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 |
| TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 |
| TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 |
| TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0} |
| storm-topology1 |
| 1 |
| (mk-ed-map [["spout1" 0 1]])) |
| builder2 (TopologyBuilder.) ;; topo2 has 4 large tasks |
| _ (doto (.setSpout builder2 "spout2" (TestWordSpout.) 4) |
| (.setMemoryLoad 500.0 12.0) |
| (.setCPULoad 100.0)) |
| storm-topology2 (.createTopology builder2) |
| topology2 (TopologyDetails. "topology2" |
| {TOPOLOGY-NAME "topology-name-2" |
| TOPOLOGY-SUBMITTER-USER "userC" |
| TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 |
| TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 |
| TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 |
| TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0} |
| storm-topology2 |
| 2 |
| (mk-ed-map [["spout2" 0 4]])) |
| builder3 (TopologyBuilder.) ;; topo3 has 4 medium tasks, launching topo 1-3 together requires the same mem as the cluster's mem capacity (5G) |
| _ (doto (.setSpout builder3 "spout3" (TestWordSpout.) 4) |
| (.setMemoryLoad 200.0 56.0) |
| (.setCPULoad 20.0)) |
| storm-topology3 (.createTopology builder3) |
| topology3 (TopologyDetails. "topology3" |
| {TOPOLOGY-NAME "topology-name-3" |
| TOPOLOGY-SUBMITTER-USER "userC" |
| TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 |
| TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 |
| TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 |
| TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0} |
| storm-topology3 |
| 2 |
| (mk-ed-map [["spout3" 0 4]])) |
| builder4 (TopologyBuilder.) ;; topo4 has 12 small tasks, each's mem req does not exactly divide a node's mem capacity |
| _ (doto (.setSpout builder4 "spout4" (TestWordSpout.) 2) |
| (.setMemoryLoad 100.0 0.0) |
| (.setCPULoad 30.0)) |
| storm-topology4 (.createTopology builder4) |
| topology4 (TopologyDetails. "topology4" |
| {TOPOLOGY-NAME "topology-name-4" |
| TOPOLOGY-SUBMITTER-USER "userC" |
| TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 |
| TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 |
| TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 |
| TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0} |
| storm-topology4 |
| 2 |
| (mk-ed-map [["spout4" 0 12]])) |
| builder5 (TopologyBuilder.) ;; topo5 has 40 small tasks, it should be able to exactly use up both the cpu and mem in teh cluster |
| _ (doto (.setSpout builder5 "spout5" (TestWordSpout.) 40) |
| (.setMemoryLoad 100.0 28.0) |
| (.setCPULoad 25.0)) |
| storm-topology5 (.createTopology builder5) |
| topology5 (TopologyDetails. "topology5" |
| {TOPOLOGY-NAME "topology-name-5" |
| TOPOLOGY-SUBMITTER-USER "userC" |
| TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 |
| TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 |
| TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 |
| TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 8192.0} |
| storm-topology5 |
| 2 |
| (mk-ed-map [["spout5" 0 40]])) |
| epsilon 0.000001 |
| topologies (Topologies. (to-top-map [topology1 topology2])) |
| scheduler (ResourceAwareScheduler.)] |
| |
| (testing "Launch topo 1-3 together, it should be able to use up either mem or cpu resource due to exact division" |
| (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {} |
| {STORM-NETWORK-TOPOGRAPHY-PLUGIN |
| "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"}) |
| topologies (Topologies. (to-top-map [topology1 topology2 topology3])) |
| _ (.schedule scheduler topologies cluster) |
| super->mem-usage (get-super->mem-usage cluster topologies) |
| super->cpu-usage (get-super->cpu-usage cluster topologies)] |
| (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1"))) |
| (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2"))) |
| (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology3"))) |
| (doseq [super (.values supers)] |
| (let [mem-avail (.getTotalMemory super) |
| mem-used (.get super->mem-usage super) |
| cpu-avail (.getTotalCPU super) |
| cpu-used (.get super->cpu-usage super)] |
| (is (or (<= (Math/abs (- mem-avail mem-used)) epsilon) |
| (<= (Math/abs (- cpu-avail cpu-used)) epsilon))))))) |
| |
| (testing "Launch topo 1, 2 and 4, they together request a little more mem than available, so one of the 3 topos will not be scheduled" |
| (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {} |
| {STORM-NETWORK-TOPOGRAPHY-PLUGIN |
| "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"}) |
| topologies (Topologies. (to-top-map [topology1 topology2 topology3])) |
| _ (.schedule scheduler topologies cluster) |
| scheduled-topos (if (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology1")) 1 0) |
| scheduled-topos (+ scheduled-topos (if (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology2")) 1 0)) |
| scheduled-topos (+ scheduled-topos (if (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology4")) 1 0))] |
| (is (= scheduled-topos 2)))) ;; only 2 topos will get (fully) scheduled |
| |
| (testing "Launch topo5 only, both mem and cpu should be exactly used up" |
| (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {} |
| {STORM-NETWORK-TOPOGRAPHY-PLUGIN |
| "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"}) |
| topologies (Topologies. (to-top-map [topology5])) |
| _ (.schedule scheduler topologies cluster) |
| super->mem-usage (get-super->mem-usage cluster topologies) |
| super->cpu-usage (get-super->cpu-usage cluster topologies)] |
| (is (= "Fully Scheduled" (.get (.getStatusMap cluster) "topology5"))) |
| (doseq [super (.values supers)] |
| (let [mem-avail (.getTotalMemory super) |
| mem-used (.get super->mem-usage super) |
| cpu-avail (.getTotalCPU ^SupervisorDetails super) |
| cpu-used (.get super->cpu-usage super)] |
| (is (and (<= (Math/abs (- mem-avail mem-used)) epsilon) |
| (<= (Math/abs (- cpu-avail cpu-used)) epsilon))))))))) |
| |
| (deftest test-topology-worker-max-heap-size |
| (let [supers (gen-supervisors 2 2)] |
| (testing "test if RAS will spread executors across mulitple workers based on the set limit for a worker used by the topology") |
| (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {} |
| {STORM-NETWORK-TOPOGRAPHY-PLUGIN |
| "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"}) |
| scheduler (ResourceAwareScheduler.) |
| builder1 (TopologyBuilder.) |
| _ (.setSpout builder1 "spout1" (TestWordSpout.) 2) |
| storm-topology1 (.createTopology builder1) |
| topology1 (TopologyDetails. "topology1" |
| {TOPOLOGY-NAME "topology-name-1" |
| TOPOLOGY-SUBMITTER-USER "userC" |
| TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 |
| TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 |
| TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 |
| TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0} |
| storm-topology1 |
| 1 |
| (mk-ed-map [["spout1" 0 4]])) |
| topologies (Topologies. (to-top-map [topology1]))] |
| (.schedule scheduler topologies cluster) |
| (is (= (.get (.getStatusMap cluster) "topology1") "Fully Scheduled")) |
| (is (= (.getAssignedNumWorkers cluster topology1) 4))) |
| (testing "test when no more workers are available due to topology worker max heap size limit but there is memory is still available") |
| (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {} |
| {STORM-NETWORK-TOPOGRAPHY-PLUGIN |
| "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"}) |
| scheduler (ResourceAwareScheduler.) |
| builder1 (TopologyBuilder.) |
| _ (.setSpout builder1 "spout1" (TestWordSpout.) 2) |
| storm-topology1 (.createTopology builder1) |
| topology1 (TopologyDetails. "topology1" |
| {TOPOLOGY-NAME "topology-name-1" |
| TOPOLOGY-SUBMITTER-USER "userC" |
| TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 128.0 |
| TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 |
| TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 |
| TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0} |
| storm-topology1 |
| 1 |
| (mk-ed-map [["spout1" 0 5]])) |
| topologies (Topologies. (to-top-map [topology1]))] |
| (.schedule scheduler topologies cluster) |
| ;;spout1 is going to contain 5 executors that needs scheduling. Each of those executors has a memory requirement of 128.0 MB |
| ;;The cluster contains 4 free WorkerSlots. For this topolology each worker is limited to a max heap size of 128.0 |
| ;;Thus, one executor not going to be able to get scheduled thus failing the scheduling of this topology and no executors of this topology will be scheduleded |
| (is (= (.size (.getUnassignedExecutors cluster topology1)) 5)) |
| (is (= (.get (.getStatusMap cluster) "topology1") "Unsuccessful in scheduling"))) |
| |
| (let [cluster (Cluster. (nimbus/standalone-nimbus) supers {} |
| {STORM-NETWORK-TOPOGRAPHY-PLUGIN |
| "backtype.storm.networktopography.DefaultRackDNSToSwitchMapping"}) |
| cluster (LocalCluster.) |
| builder1 (TopologyBuilder.) |
| _ (.setSpout builder1 "spout1" (TestWordSpout.) 2) |
| storm-topology1 (.createTopology builder1) |
| conf {TOPOLOGY-NAME "topology-name-1" |
| TOPOLOGY-SUBMITTER-USER "userC" |
| TOPOLOGY-COMPONENT-RESOURCES-ONHEAP-MEMORY-MB 129.0 |
| TOPOLOGY-COMPONENT-RESOURCES-OFFHEAP-MEMORY-MB 0.0 |
| TOPOLOGY-COMPONENT-CPU-PCORE-PERCENT 10.0 |
| TOPOLOGY-WORKER-MAX-HEAP-SIZE-MB 128.0} |
| topology1 (TopologyDetails. "topology1" |
| conf |
| storm-topology1 |
| 1 |
| (mk-ed-map [["spout1" 0 5]])) |
| topologies (Topologies. (to-top-map [topology1]))] |
| (is (thrown? IllegalArgumentException |
| (StormSubmitter/submitTopologyWithProgressBar "test" conf storm-topology1))) |
| |
| ))) |