Merge branch 'STORM-2010'
diff --git a/bin/storm.py b/bin/storm.py
index 0da2ae1..c49ab09 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -338,7 +338,7 @@
get PATH - Get the heartbeat data at PATH
"""
exec_storm_class(
- "org.apache.storm.command.heartbeats",
+ "org.apache.storm.command.Heartbeats",
args=args,
jvmtype="-client",
extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
diff --git a/storm-core/src/clj/org/apache/storm/command/heartbeats.clj b/storm-core/src/clj/org/apache/storm/command/heartbeats.clj
deleted file mode 100644
index 625cff7..0000000
--- a/storm-core/src/clj/org/apache/storm/command/heartbeats.clj
+++ /dev/null
@@ -1,54 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.command.heartbeats
- (:require [org.apache.storm
- [config :refer :all]
- [log :refer :all]
- [util :refer :all]
- [converter :refer :all]]
- [clojure.string :as string])
- (:import [org.apache.storm.generated ClusterWorkerHeartbeat]
- [org.apache.storm.utils Utils ConfigUtils]
- [org.apache.storm.cluster ZKStateStorage ClusterStateContext ClusterUtils]
- [org.apache.storm.stats StatsUtil])
- (:gen-class))
-
-(defn -main [command path & args]
- (let [conf (clojurify-structure (ConfigUtils/readStormConfig))
- cluster (ClusterUtils/mkStateStorage conf conf nil (ClusterStateContext.))]
- (println "Command: [" command "]")
- (condp = command
- "list"
- (let [message (clojure.string/join " \n" (.get_worker_hb_children cluster path false))]
- (log-message "list " path ":\n"
- message "\n"))
- "get"
- (log-message
- (if-let [hb (.get_worker_hb cluster path false)]
- (StatsUtil/convertZkWorkerHb
- (Utils/deserialize
- hb
- ClusterWorkerHeartbeat))
- "Nothing"))
-
- (log-message "Usage: heartbeats [list|get] path"))
-
- (try
- (.close cluster)
- (catch Exception e
- (log-message "Caught exception: " e " on close."))))
- (System/exit 0))
-
diff --git a/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java b/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
new file mode 100644
index 0000000..2a149cb
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/command/Heartbeats.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.command;
+
+import com.google.common.base.Joiner;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.IStateStorage;
+import org.apache.storm.generated.ClusterWorkerHeartbeat;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class Heartbeats {
+ private static final Logger LOG = LoggerFactory.getLogger(Heartbeats.class);
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 2) {
+ throw new IllegalArgumentException("Command and path arguments must be provided.");
+ }
+
+ String command = args[0];
+ String path = args[1];
+
+ Map<String, Object> conf = ConfigUtils.readStormConfig();
+ IStateStorage cluster = ClusterUtils.mkStateStorage(conf, conf, null, new ClusterStateContext());
+
+ LOG.info("Command: [{}]", command);
+
+ switch (command) {
+ case "list":
+ handleListCommand(cluster, path);
+ break;
+
+ case "get":
+ handleGetCommand(cluster, path);
+ break;
+
+ default:
+ LOG.info("Usage: heartbeats [list|get] path");
+ }
+
+ try {
+ cluster.close();
+ } catch (Exception e) {
+ LOG.info("Caught exception: {} on close.", e);
+ }
+
+ // force process to be terminated
+ System.exit(0);
+ }
+
+ private static void handleListCommand(IStateStorage cluster, String path) {
+ String message = Joiner.on("\n").join(cluster.get_worker_hb_children(path, false));
+ LOG.info("list {}:\n{}\n", path, message);
+ }
+
+ private static void handleGetCommand(IStateStorage cluster, String path) {
+ String message;
+ byte[] hb = cluster.get_worker_hb(path, false);
+ if (hb != null) {
+ Map<String, Object> heartbeatMap = StatsUtil.convertZkWorkerHb(Utils.deserialize(hb, ClusterWorkerHeartbeat.class));
+ message = JSONValue.toJSONString(heartbeatMap);
+ } else {
+ message = "No Heartbeats found";
+ }
+ LOG.info(message);
+ }
+}