| ;; Licensed to the Apache Software Foundation (ASF) under one |
| ;; or more contributor license agreements. See the NOTICE file |
| ;; distributed with this work for additional information |
| ;; regarding copyright ownership. The ASF licenses this file |
| ;; to you under the Apache License, Version 2.0 (the |
| ;; "License"); you may not use this file except in compliance |
| ;; with the License. You may obtain a copy of the License at |
| ;; |
| ;; http://www.apache.org/licenses/LICENSE-2.0 |
| ;; |
| ;; Unless required by applicable law or agreed to in writing, software |
| ;; distributed under the License is distributed on an "AS IS" BASIS, |
| ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| ;; See the License for the specific language governing permissions and |
| ;; limitations under the License. |
| (ns backtype.storm.scheduler.DefaultScheduler |
| (:use [backtype.storm util config]) |
| (:require [backtype.storm.scheduler.EvenScheduler :as EvenScheduler]) |
| (:import [backtype.storm.scheduler IScheduler Topologies |
| Cluster TopologyDetails WorkerSlot SchedulerAssignment |
| EvenScheduler ExecutorDetails]) |
| (:gen-class |
| :implements [backtype.storm.scheduler.IScheduler])) |
| |
| (defn- bad-slots [existing-slots num-executors num-workers] |
| (if (= 0 num-workers) |
| '() |
| (let [distribution (atom (integer-divided num-executors num-workers)) |
| keepers (atom {})] |
| (doseq [[node+port executor-list] existing-slots :let [executor-count (count executor-list)]] |
| (when (pos? (get @distribution executor-count 0)) |
| (swap! keepers assoc node+port executor-list) |
| (swap! distribution update-in [executor-count] dec) |
| )) |
| (->> @keepers |
| keys |
| (apply dissoc existing-slots) |
| keys |
| (map (fn [[node port]] |
| (WorkerSlot. node port))))))) |
| |
| (defn slots-can-reassign [^Cluster cluster slots] |
| (->> slots |
| (filter |
| (fn [[node port]] |
| (if-not (.isBlackListed cluster node) |
| (if-let [supervisor (.getSupervisorById cluster node)] |
| (.contains (.getAllPorts supervisor) (int port)) |
| )))))) |
| |
| (defn -prepare [this conf] |
| ) |
| |
| (defn default-schedule [^Topologies topologies ^Cluster cluster] |
| (let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)] |
| (doseq [^TopologyDetails topology needs-scheduling-topologies |
| :let [topology-id (.getId topology) |
| available-slots (->> (.getAvailableSlots cluster) |
| (map #(vector (.getNodeId %) (.getPort %)))) |
| all-executors (->> topology |
| .getExecutors |
| (map #(vector (.getStartTask %) (.getEndTask %))) |
| set) |
| alive-assigned (EvenScheduler/get-alive-assigned-node+port->executors cluster topology-id) |
| alive-executors (->> alive-assigned vals (apply concat) set) |
| can-reassign-slots (slots-can-reassign cluster (keys alive-assigned)) |
| total-slots-to-use (min (.getNumWorkers topology) |
| (+ (count can-reassign-slots) (count available-slots))) |
| bad-slots (if (or (> total-slots-to-use (count alive-assigned)) |
| (not= alive-executors all-executors)) |
| (bad-slots alive-assigned (count all-executors) total-slots-to-use) |
| [])]] |
| (.freeSlots cluster bad-slots) |
| (EvenScheduler/schedule-topologies-evenly (Topologies. {topology-id topology}) cluster)))) |
| |
| (defn -schedule [this ^Topologies topologies ^Cluster cluster] |
| (default-schedule topologies cluster)) |