| ;; Licensed to the Apache Software Foundation (ASF) under one |
| ;; or more contributor license agreements. See the NOTICE file |
| ;; distributed with this work for additional information |
| ;; regarding copyright ownership. The ASF licenses this file |
| ;; to you under the Apache License, Version 2.0 (the |
| ;; "License"); you may not use this file except in compliance |
| ;; with the License. You may obtain a copy of the License at |
| ;; |
| ;; http://www.apache.org/licenses/LICENSE-2.0 |
| ;; |
| ;; Unless required by applicable law or agreed to in writing, software |
| ;; distributed under the License is distributed on an "AS IS" BASIS, |
| ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| ;; See the License for the specific language governing permissions and |
| ;; limitations under the License. |
| (ns backtype.storm.converter |
| (:import [backtype.storm.generated SupervisorInfo NodeInfo Assignment |
| StormBase TopologyStatus ClusterWorkerHeartbeat ExecutorInfo ErrorInfo Credentials RebalanceOptions KillOptions |
| TopologyActionOptions DebugOptions]) |
| (:use [backtype.storm util stats log]) |
| (:require [backtype.storm.daemon [common :as common]])) |
| |
| (defn thriftify-supervisor-info [supervisor-info] |
| (doto (SupervisorInfo.) |
| (.set_time_secs (long (:time-secs supervisor-info))) |
| (.set_hostname (:hostname supervisor-info)) |
| (.set_assignment_id (:assignment-id supervisor-info)) |
| (.set_used_ports (map long (:used-ports supervisor-info))) |
| (.set_meta (map long (:meta supervisor-info))) |
| (.set_scheduler_meta (:scheduler-meta supervisor-info)) |
| (.set_uptime_secs (long (:uptime-secs supervisor-info))) |
| (.set_version (:version supervisor-info)) |
| )) |
| |
| (defn clojurify-supervisor-info [^SupervisorInfo supervisor-info] |
| (if supervisor-info |
| (backtype.storm.daemon.common.SupervisorInfo. |
| (.get_time_secs supervisor-info) |
| (.get_hostname supervisor-info) |
| (.get_assignment_id supervisor-info) |
| (if (.get_used_ports supervisor-info) (into [] (.get_used_ports supervisor-info))) |
| (if (.get_meta supervisor-info) (into [] (.get_meta supervisor-info))) |
| (if (.get_scheduler_meta supervisor-info) (into {} (.get_scheduler_meta supervisor-info))) |
| (.get_uptime_secs supervisor-info) |
| (.get_version supervisor-info)))) |
| |
| (defn thriftify-assignment [assignment] |
| (doto (Assignment.) |
| (.set_master_code_dir (:master-code-dir assignment)) |
| (.set_node_host (:node->host assignment)) |
| (.set_executor_node_port (map-val |
| (fn [node+port] |
| (NodeInfo. (first node+port) (set (map long (rest node+port))))) |
| (map-key #(map long %) |
| (:executor->node+port assignment)))) |
| (.set_executor_start_time_secs |
| (map-val |
| long |
| (map-key #(map long %) |
| (:executor->start-time-secs assignment)))))) |
| |
| (defn clojurify-executor->node_port [executor->node_port] |
| (into {} |
| (map-val |
| (fn [nodeInfo] |
| (concat [(.get_node nodeInfo)] (.get_port nodeInfo))) ;nodeInfo should be converted to [node,port1,port2..] |
| (map-key |
| (fn [list-of-executors] |
| (into [] list-of-executors)) ; list of executors must be coverted to clojure vector to ensure it is sortable. |
| executor->node_port)))) |
| |
| (defn clojurify-assignment [^Assignment assignment] |
| (if assignment |
| (backtype.storm.daemon.common.Assignment. |
| (.get_master_code_dir assignment) |
| (into {} (.get_node_host assignment)) |
| (clojurify-executor->node_port (into {} (.get_executor_node_port assignment))) |
| (map-key (fn [executor] (into [] executor)) |
| (into {} (.get_executor_start_time_secs assignment)))))) |
| |
| (defn convert-to-symbol-from-status [status] |
| (condp = status |
| TopologyStatus/ACTIVE {:type :active} |
| TopologyStatus/INACTIVE {:type :inactive} |
| TopologyStatus/REBALANCING {:type :rebalancing} |
| TopologyStatus/KILLED {:type :killed} |
| nil)) |
| |
| (defn- convert-to-status-from-symbol [status] |
| (if status |
| (condp = (:type status) |
| :active TopologyStatus/ACTIVE |
| :inactive TopologyStatus/INACTIVE |
| :rebalancing TopologyStatus/REBALANCING |
| :killed TopologyStatus/KILLED |
| nil))) |
| |
| (defn clojurify-rebalance-options [^RebalanceOptions rebalance-options] |
| (-> {:action :rebalance} |
| (assoc-non-nil :delay-secs (if (.is_set_wait_secs rebalance-options) (.get_wait_secs rebalance-options))) |
| (assoc-non-nil :num-workers (if (.is_set_num_workers rebalance-options) (.get_num_workers rebalance-options))) |
| (assoc-non-nil :component->executors (if (.is_set_num_executors rebalance-options) (into {} (.get_num_executors rebalance-options)))))) |
| |
| (defn thriftify-rebalance-options [rebalance-options] |
| (if rebalance-options |
| (let [thrift-rebalance-options (RebalanceOptions.)] |
| (if (:delay-secs rebalance-options) |
| (.set_wait_secs thrift-rebalance-options (int (:delay-secs rebalance-options)))) |
| (if (:num-workers rebalance-options) |
| (.set_num_workers thrift-rebalance-options (int (:num-workers rebalance-options)))) |
| (if (:component->executors rebalance-options) |
| (.set_num_executors thrift-rebalance-options (map-val int (:component->executors rebalance-options)))) |
| thrift-rebalance-options))) |
| |
| (defn clojurify-kill-options [^KillOptions kill-options] |
| (-> {:action :kill} |
| (assoc-non-nil :delay-secs (if (.is_set_wait_secs kill-options) (.get_wait_secs kill-options))))) |
| |
| (defn thriftify-kill-options [kill-options] |
| (if kill-options |
| (let [thrift-kill-options (KillOptions.)] |
| (if (:delay-secs kill-options) |
| (.set_wait_secs thrift-kill-options (int (:delay-secs kill-options)))) |
| thrift-kill-options))) |
| |
| (defn thriftify-topology-action-options [storm-base] |
| (if (:topology-action-options storm-base) |
| (let [ topology-action-options (:topology-action-options storm-base) |
| action (:action topology-action-options) |
| thrift-topology-action-options (TopologyActionOptions.)] |
| (if (= action :kill) |
| (.set_kill_options thrift-topology-action-options (thriftify-kill-options topology-action-options))) |
| (if (= action :rebalance) |
| (.set_rebalance_options thrift-topology-action-options (thriftify-rebalance-options topology-action-options))) |
| thrift-topology-action-options))) |
| |
| (defn clojurify-topology-action-options [^TopologyActionOptions topology-action-options] |
| (if topology-action-options |
| (or (and (.is_set_kill_options topology-action-options) |
| (clojurify-kill-options |
| (.get_kill_options topology-action-options))) |
| (and (.is_set_rebalance_options topology-action-options) |
| (clojurify-rebalance-options |
| (.get_rebalance_options topology-action-options)))))) |
| |
| (defn clojurify-debugoptions [^DebugOptions options] |
| (if options |
| { |
| :enable (.is_enable options) |
| :samplingpct (.get_samplingpct options) |
| } |
| )) |
| |
| (defn thriftify-debugoptions [options] |
| (doto (DebugOptions.) |
| (.set_enable (get options :enable false)) |
| (.set_samplingpct (get options :samplingpct 10)))) |
| |
| (defn thriftify-storm-base [storm-base] |
| (doto (StormBase.) |
| (.set_name (:storm-name storm-base)) |
| (.set_launch_time_secs (int (:launch-time-secs storm-base))) |
| (.set_status (convert-to-status-from-symbol (:status storm-base))) |
| (.set_num_workers (int (:num-workers storm-base))) |
| (.set_component_executors (map-val int (:component->executors storm-base))) |
| (.set_owner (:owner storm-base)) |
| (.set_topology_action_options (thriftify-topology-action-options storm-base)) |
| (.set_prev_status (convert-to-status-from-symbol (:prev-status storm-base))) |
| (.set_component_debug (map-val thriftify-debugoptions (:component->debug storm-base))))) |
| |
| (defn clojurify-storm-base [^StormBase storm-base] |
| (if storm-base |
| (backtype.storm.daemon.common.StormBase. |
| (.get_name storm-base) |
| (.get_launch_time_secs storm-base) |
| (convert-to-symbol-from-status (.get_status storm-base)) |
| (.get_num_workers storm-base) |
| (into {} (.get_component_executors storm-base)) |
| (.get_owner storm-base) |
| (clojurify-topology-action-options (.get_topology_action_options storm-base)) |
| (convert-to-symbol-from-status (.get_prev_status storm-base)) |
| (map-val clojurify-debugoptions (.get_component_debug storm-base))))) |
| |
| (defn thriftify-stats [stats] |
| (if stats |
| (map-val thriftify-executor-stats |
| (map-key #(ExecutorInfo. (int (first %1)) (int (last %1))) |
| stats)) |
| {})) |
| |
| (defn clojurify-stats [stats] |
| (if stats |
| (map-val clojurify-executor-stats |
| (map-key (fn [x] (list (.get_task_start x) (.get_task_end x))) |
| stats)) |
| {})) |
| |
| (defn clojurify-zk-worker-hb [^ClusterWorkerHeartbeat worker-hb] |
| (if worker-hb |
| {:storm-id (.get_storm_id worker-hb) |
| :executor-stats (clojurify-stats (into {} (.get_executor_stats worker-hb))) |
| :uptime (.get_uptime_secs worker-hb) |
| :time-secs (.get_time_secs worker-hb) |
| } |
| {})) |
| |
| (defn thriftify-zk-worker-hb [worker-hb] |
| (if (not-empty (filter second (:executor-stats worker-hb))) |
| (doto (ClusterWorkerHeartbeat.) |
| (.set_uptime_secs (:uptime worker-hb)) |
| (.set_storm_id (:storm-id worker-hb)) |
| (.set_executor_stats (thriftify-stats (filter second (:executor-stats worker-hb)))) |
| (.set_time_secs (:time-secs worker-hb))))) |
| |
| (defn clojurify-error [^ErrorInfo error] |
| (if error |
| { |
| :error (.get_error error) |
| :time-secs (.get_error_time_secs error) |
| :host (.get_host error) |
| :port (.get_port error) |
| } |
| )) |
| |
| (defn thriftify-error [error] |
| (doto (ErrorInfo. (:error error) (:time-secs error)) |
| (.set_host (:host error)) |
| (.set_port (:port error)))) |
| |
| (defn thriftify-credentials [credentials] |
| (doto (Credentials.) |
| (.set_creds (if credentials credentials {})))) |
| |
| (defn clojurify-crdentials [^Credentials credentials] |
| (if credentials |
| (into {} (.get_creds credentials)) |
| nil |
| )) |