blob: 25ba03b1b85849db2d419665efec5c2bfc1d2dab [file] [log] [blame]
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns backtype.storm.scheduler.EvenScheduler
(:use [backtype.storm util log config])
(:require [clojure.set :as set])
(:import [backtype.storm.scheduler IScheduler Topologies
Cluster TopologyDetails WorkerSlot ExecutorDetails])
(:gen-class
:implements [backtype.storm.scheduler.IScheduler]))
(defn sort-slots [all-slots]
(let [split-up (sort-by count > (vals (group-by first all-slots)))]
(apply interleave-all split-up)
))
(defn get-alive-assigned-node+port->executors [cluster topology-id]
(let [existing-assignment (.getAssignmentById cluster topology-id)
executor->slot (if existing-assignment
(.getExecutorToSlot existing-assignment)
{})
executor->node+port (into {} (for [[^ExecutorDetails executor ^WorkerSlot slot] executor->slot
:let [executor [(.getStartTask executor) (.getEndTask executor)]
node+port [(.getNodeId slot) (.getPort slot)]]]
{executor node+port}))
alive-assigned (reverse-map executor->node+port)]
alive-assigned))
(defn- schedule-topology [^TopologyDetails topology ^Cluster cluster]
(let [topology-id (.getId topology)
available-slots (->> (.getAvailableSlots cluster)
(map #(vector (.getNodeId %) (.getPort %))))
all-executors (->> topology
.getExecutors
(map #(vector (.getStartTask %) (.getEndTask %)))
set)
alive-assigned (get-alive-assigned-node+port->executors cluster topology-id)
total-slots-to-use (min (.getNumWorkers topology)
(+ (count available-slots) (count alive-assigned)))
reassign-slots (take (- total-slots-to-use (count alive-assigned))
(sort-slots available-slots))
reassign-executors (sort (set/difference all-executors (set (apply concat (vals alive-assigned)))))
reassignment (into {}
(map vector
reassign-executors
;; for some reason it goes into infinite loop without limiting the repeat-seq
(repeat-seq (count reassign-executors) reassign-slots)))]
(when-not (empty? reassignment)
(log-message "Available slots: " (pr-str available-slots))
)
reassignment))
(defn schedule-topologies-evenly [^Topologies topologies ^Cluster cluster]
(let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)]
(doseq [^TopologyDetails topology needs-scheduling-topologies
:let [topology-id (.getId topology)
new-assignment (schedule-topology topology cluster)
node+port->executors (reverse-map new-assignment)]]
(doseq [[node+port executors] node+port->executors
:let [^WorkerSlot slot (WorkerSlot. (first node+port) (last node+port))
executors (for [[start-task end-task] executors]
(ExecutorDetails. start-task end-task))]]
(.assign cluster slot topology-id executors)))))
(defn -prepare [this conf]
)
(defn -schedule [this ^Topologies topologies ^Cluster cluster]
(schedule-topologies-evenly topologies cluster))